This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd9a2c0cb0ea0f7a71d176e1aec789de486adc56 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jun 18 10:21:18 2019 +0200 [FLINK-12641][coordination] Release partitions on job shutdown --- .../apache/flink/runtime/jobmaster/JobMaster.java | 6 +++ .../flink/runtime/jobmaster/JobMasterTest.java | 55 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2732727..4bc6e32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -892,6 +892,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast validateRunsInMainThread(); if (newJobStatus.isGloballyTerminalState()) { + // other terminal job states are handled by the executions + if (newJobStatus == JobStatus.FINISHED) { + runAsync(() -> registeredTaskManagers.keySet() + .forEach(partitionTracker::stopTrackingAndReleasePartitionsFor)); + } + final ArchivedExecutionGraph archivedExecutionGraph = schedulerNG.requestJob(); scheduledExecutorService.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 1291d99..e6029cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; @@ -1835,6 +1836,60 @@ public class JobMasterTest extends TestLogger { } } + @Test + public void testPartitionReleaseOnJobTermination() throws Exception { + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobGraph jobGraph = createSingleVertexJobGraph(); + + final CompletableFuture<ResourceID> partitionCleanupTaskExecutorId = new CompletableFuture<>(); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(partitionCleanupTaskExecutorId::complete); + + final JobMaster jobMaster = new JobMasterBuilder() + .withConfiguration(configuration) + .withJobGraph(jobGraph) + .withHighAvailabilityServices(haServices) + .withJobManagerSharedServices(jobManagerSharedServices) + .withHeartbeatServices(heartbeatServices) + .withOnCompletionActions(new TestingOnCompletionActions()) + .withPartitionTrackerFactory(ignord -> partitionTracker) + .createJobMaster(); + + final CompletableFuture<TaskDeploymentDescriptor> taskDeploymentDescriptorFuture = new CompletableFuture<>(); + final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsFuture = new CompletableFuture<>(); + final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setReleasePartitionsConsumer((jobId, partitions) -> releasePartitionsFuture.complete(Tuple2.of(jobId, partitions))) + .setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) + .setSubmitTaskConsumer((tdd, ignored) -> { + taskDeploymentDescriptorFuture.complete(tdd); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + registerSlotsAtJobMaster(1, jobMasterGateway, testingTaskExecutorGateway, taskManagerLocation); + + // update the execution state of the only execution to FINISHED + // this should trigger the job to finish + final TaskDeploymentDescriptor taskDeploymentDescriptor = taskDeploymentDescriptorFuture.get(); + jobMasterGateway.updateTaskExecutionState( + new TaskExecutionState( + taskDeploymentDescriptor.getJobId(), + taskDeploymentDescriptor.getExecutionAttemptId(), + ExecutionState.FINISHED)); + + assertThat(partitionCleanupTaskExecutorId.get(), equalTo(taskManagerLocation.getResourceID())); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + /** * Tests that the job execution is failed if the TaskExecutor disconnects from the * JobMaster.
