This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27712bd41e243cf26120fbb9111c04ed4f22c50e
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Jun 14 12:46:39 2019 +0200

    [FLINK-12612][coordination] Maintain JM connection until all partitions are 
released
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 ++-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 57 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4bc6e32..19168a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -536,7 +536,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private void internalFailAllocation(AllocationID allocationId, 
Exception cause) {
                final Optional<ResourceID> resourceIdOptional = 
slotPool.failAllocation(allocationId, cause);
-               resourceIdOptional.ifPresent(this::releaseEmptyTaskManager);
+               resourceIdOptional.ifPresent(taskManagerId -> {
+                       if 
(!partitionTracker.isTrackingPartitionsFor(taskManagerId)) {
+                               releaseEmptyTaskManager(taskManagerId);
+                       }
+               });
        }
 
        private void releaseEmptyTaskManager(ResourceID resourceId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e6029cd..cba49d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1711,6 +1711,63 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       @Test
+       public void 
testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws 
Exception {
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+
+               final JobGraph jobGraph = createSingleVertexJobGraph();
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+
+               final AtomicBoolean isTrackingPartitions = new 
AtomicBoolean(true);
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               partitionTracker.setIsTrackingPartitionsForFunction(ignored -> 
isTrackingPartitions.get());
+
+               final JobMaster jobMaster = new JobMasterBuilder()
+                       .withConfiguration(configuration)
+                       .withJobGraph(jobGraph)
+                       .withHighAvailabilityServices(haServices)
+                       .withJobManagerSharedServices(jobManagerSharedServices)
+                       .withHeartbeatServices(heartbeatServices)
+                       .withOnCompletionActions(new 
TestingOnCompletionActions())
+                       .withPartitionTrackerFactory(ignored -> 
partitionTracker)
+                       .createJobMaster();
+
+               final CompletableFuture<JobID> disconnectTaskExecutorFuture = 
new CompletableFuture<>();
+               final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(
+                               (allocationID, throwable) -> {
+                                       freedSlotFuture.complete(allocationID);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                       .setDisconnectJobManagerConsumer((jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
+                       .createTestingTaskExecutorGateway();
+
+               try {
+                       jobMaster.start(jobMasterId).get();
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       final Collection<SlotOffer> slotOffers = 
registerSlotsAtJobMaster(1, jobMasterGateway, testingTaskExecutorGateway, 
taskManagerLocation);
+
+                       // check that we accepted the offered slot
+                       assertThat(slotOffers, hasSize(1));
+                       final AllocationID allocationId = 
slotOffers.iterator().next().getAllocationId();
+
+                       jobMasterGateway.notifyAllocationFailure(allocationId, 
new FlinkException("Fail allocation test exception"));
+
+                       // we should free the slot, but not disconnect from the 
TaskExecutor as we still have an allocated partition
+                       assertThat(freedSlotFuture.get(), 
equalTo(allocationId));
+
+                       // trigger some request to guarantee ensure the 
slotAllocationFailure processing if complete
+                       
jobMasterGateway.requestJobStatus(Time.seconds(5)).get();
+                       assertThat(disconnectTaskExecutorFuture.isDone(), 
is(false));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        /**
         * Tests the updateGlobalAggregate functionality
         */

Reply via email to