This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 27712bd41e243cf26120fbb9111c04ed4f22c50e Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Jun 14 12:46:39 2019 +0200 [FLINK-12612][coordination] Maintain JM connection until all partitions are released --- .../apache/flink/runtime/jobmaster/JobMaster.java | 6 ++- .../flink/runtime/jobmaster/JobMasterTest.java | 57 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 4bc6e32..19168a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -536,7 +536,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private void internalFailAllocation(AllocationID allocationId, Exception cause) { final Optional<ResourceID> resourceIdOptional = slotPool.failAllocation(allocationId, cause); - resourceIdOptional.ifPresent(this::releaseEmptyTaskManager); + resourceIdOptional.ifPresent(taskManagerId -> { + if (!partitionTracker.isTrackingPartitionsFor(taskManagerId)) { + releaseEmptyTaskManager(taskManagerId); + } + }); } private void releaseEmptyTaskManager(ResourceID resourceId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e6029cd..cba49d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1711,6 +1711,63 @@ public class JobMasterTest extends TestLogger { } } + @Test + public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception { + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + + final JobGraph jobGraph = createSingleVertexJobGraph(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + + final AtomicBoolean isTrackingPartitions = new AtomicBoolean(true); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setIsTrackingPartitionsForFunction(ignored -> isTrackingPartitions.get()); + + final JobMaster jobMaster = new JobMasterBuilder() + .withConfiguration(configuration) + .withJobGraph(jobGraph) + .withHighAvailabilityServices(haServices) + .withJobManagerSharedServices(jobManagerSharedServices) + .withHeartbeatServices(heartbeatServices) + .withOnCompletionActions(new TestingOnCompletionActions()) + .withPartitionTrackerFactory(ignored -> partitionTracker) + .createJobMaster(); + + final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); + final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction( + (allocationID, throwable) -> { + freedSlotFuture.complete(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) + .createTestingTaskExecutorGateway(); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final Collection<SlotOffer> slotOffers = registerSlotsAtJobMaster(1, jobMasterGateway, testingTaskExecutorGateway, taskManagerLocation); + + // check that we accepted the offered slot + assertThat(slotOffers, hasSize(1)); + final AllocationID allocationId = slotOffers.iterator().next().getAllocationId(); + + jobMasterGateway.notifyAllocationFailure(allocationId, new FlinkException("Fail allocation test exception")); + + // we should free the slot, but not disconnect from the TaskExecutor as we still have an allocated partition + assertThat(freedSlotFuture.get(), equalTo(allocationId)); + + // trigger some request to guarantee ensure the slotAllocationFailure processing if complete + jobMasterGateway.requestJobStatus(Time.seconds(5)).get(); + assertThat(disconnectTaskExecutorFuture.isDone(), is(false)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + /** * Tests the updateGlobalAggregate functionality */
