This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd9a2c0cb0ea0f7a71d176e1aec789de486adc56
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jun 18 10:21:18 2019 +0200

    [FLINK-12641][coordination] Release partitions on job shutdown
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 +++
 .../flink/runtime/jobmaster/JobMasterTest.java     | 55 ++++++++++++++++++++++
 2 files changed, 61 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2732727..4bc6e32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -892,6 +892,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                validateRunsInMainThread();
 
                if (newJobStatus.isGloballyTerminalState()) {
+                       // other terminal job states are handled by the 
executions
+                       if (newJobStatus == JobStatus.FINISHED) {
+                               runAsync(() -> registeredTaskManagers.keySet()
+                                       
.forEach(partitionTracker::stopTrackingAndReleasePartitionsFor));
+                       }
+
                        final ArchivedExecutionGraph archivedExecutionGraph = 
schedulerNG.requestJob();
                        scheduledExecutorService.execute(() -> 
jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 1291d99..e6029cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -1835,6 +1836,60 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testPartitionReleaseOnJobTermination() throws Exception {
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+               final JobGraph jobGraph = createSingleVertexJobGraph();
+
+               final CompletableFuture<ResourceID> 
partitionCleanupTaskExecutorId = new CompletableFuture<>();
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               
partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(partitionCleanupTaskExecutorId::complete);
+
+               final JobMaster jobMaster = new JobMasterBuilder()
+                       .withConfiguration(configuration)
+                       .withJobGraph(jobGraph)
+                       .withHighAvailabilityServices(haServices)
+                       .withJobManagerSharedServices(jobManagerSharedServices)
+                       .withHeartbeatServices(heartbeatServices)
+                       .withOnCompletionActions(new 
TestingOnCompletionActions())
+                       .withPartitionTrackerFactory(ignord -> partitionTracker)
+                       .createJobMaster();
+
+               final CompletableFuture<TaskDeploymentDescriptor> 
taskDeploymentDescriptorFuture = new CompletableFuture<>();
+               final CompletableFuture<Tuple2<JobID, 
Collection<ResultPartitionID>>> releasePartitionsFuture = new 
CompletableFuture<>();
+               final CompletableFuture<JobID> disconnectTaskExecutorFuture = 
new CompletableFuture<>();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setReleasePartitionsConsumer((jobId, partitions) -> 
releasePartitionsFuture.complete(Tuple2.of(jobId, partitions)))
+                       .setDisconnectJobManagerConsumer((jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
+                       .setSubmitTaskConsumer((tdd, ignored) -> {
+                               taskDeploymentDescriptorFuture.complete(tdd);
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .createTestingTaskExecutorGateway();
+
+               try {
+                       jobMaster.start(jobMasterId).get();
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+                       registerSlotsAtJobMaster(1, jobMasterGateway, 
testingTaskExecutorGateway, taskManagerLocation);
+
+                       // update the execution state of the only execution to 
FINISHED
+                       // this should trigger the job to finish
+                       final TaskDeploymentDescriptor taskDeploymentDescriptor 
= taskDeploymentDescriptorFuture.get();
+                       jobMasterGateway.updateTaskExecutionState(
+                               new TaskExecutionState(
+                                       taskDeploymentDescriptor.getJobId(),
+                                       
taskDeploymentDescriptor.getExecutionAttemptId(),
+                                       ExecutionState.FINISHED));
+
+                       assertThat(partitionCleanupTaskExecutorId.get(), 
equalTo(taskManagerLocation.getResourceID()));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        /**
         * Tests that the job execution is failed if the TaskExecutor 
disconnects from the
         * JobMaster.

Reply via email to