This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit eef910f70988a12bb85022674763bbc35448ddf7 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Feb 6 11:35:51 2025 +0100 [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown (#24489) * Makes stopping the job execution in JobMaster run in the main thread * Makes JobMaster#closeAsync more robust to disconnect calls during shutdown * Adds test to cover FLINK-34227 scenario --- .../apache/flink/runtime/jobmaster/JobMaster.java | 31 ++++++--- .../flink/runtime/jobmaster/JobMasterTest.java | 81 ++++++++++++++++++++++ .../slotpool/TestingSlotPoolServiceBuilder.java | 5 ++ 3 files changed, 109 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 6edaa00ef07..2205e324677 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -839,14 +839,29 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> public void disconnectResourceManager( final ResourceManagerId resourceManagerId, final Exception cause) { - if (isConnectingToResourceManager(resourceManagerId)) { + if (resourceManagerAddress == null) { + log.debug( + "Disconnecting ResourceManager {} was triggered with no ResourceManager address " + + "being set (anymore). That either indicates that the ResourceManager " + + "lost leadership in the mean time or the message was received while " + + "shutting down the JobMaster. No reconnect will be initiated.", + resourceManagerId); + } else if (!resourceManagerAddress.getResourceManagerId().equals(resourceManagerId)) { + log.debug( + "Disconnecting ResourceManager {} was received while this instance is currently " + + "connected to another ResourceManager {} indicating that a ResourceManager " + + "leader change happened. No reconnect will be initiated.", + resourceManagerId, + resourceManagerAddress.getResourceManagerId()); + } else { reconnectToResourceManager(cause); } } - private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) { - return resourceManagerAddress != null - && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId); + private void shutdownResourceManagerConnection(Exception cause) { + // unsetting the resourceManagerAddress will prevent reconnection + resourceManagerAddress = null; + closeResourceManagerConnection(cause); } @Override @@ -1045,13 +1060,14 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> final CompletableFuture<Void> terminationFuture = stopScheduling(); - return FutureUtils.runAfterwards( + return FutureUtils.runAfterwardsAsync( terminationFuture, () -> { shuffleMaster.unregisterJob(jobGraph.getJobID()); disconnectTaskManagerResourceManagerConnections(cause); stopJobMasterServices(); - }); + }, + getMainThreadExecutor()); } private void disconnectTaskManagerResourceManagerConnections(Exception cause) { @@ -1063,8 +1079,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> disconnectTaskManager(taskManagerResourceId, cause); } - // disconnect from resource manager: - closeResourceManagerConnection(cause); + shutdownResourceManagerConnection(cause); } private void stopHeartbeatServices() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e8e63652283..2b5cc013f3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -160,6 +160,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -2099,6 +2100,86 @@ class JobMasterTest { } } + @Test + public void testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls() + throws Exception { + + // the ResourceManager should count the connect attempts + final TestingResourceManagerGateway resourceManagerGateway = + createAndRegisterTestingResourceManagerGateway(); + final AtomicInteger connectCount = new AtomicInteger(); + final OneShotLatch firstRegistrationLatch = new OneShotLatch(); + resourceManagerGateway.setRegisterJobManagerFunction( + (jobMasterId, resourceID, s, jobID) -> { + connectCount.incrementAndGet(); + firstRegistrationLatch.trigger(); + + return CompletableFuture.completedFuture( + resourceManagerGateway.getJobMasterRegistrationSuccess()); + }); + + final OneShotLatch schedulerCloseLatch = new OneShotLatch(); + final CompletableFuture<Void> schedulerCloseFuture = new CompletableFuture<>(); + final TestingSchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setCloseAsyncSupplier( + () -> { + schedulerCloseLatch.trigger(); + return schedulerCloseFuture; + }) + .build(); + + final OneShotLatch closeSlotPoolLatch = new OneShotLatch(); + final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withHighAvailabilityServices(haServices) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder() + .setCloseRunnable(closeSlotPoolLatch::awaitQuietly), + new TestingSchedulerNGFactory(scheduler))) + .createJobMaster(); + jobMaster.start(); + + notifyResourceManagerLeaderListeners(resourceManagerGateway); + firstRegistrationLatch.await(); + + final CompletableFuture<Void> jobMasterCloseFuture = jobMaster.closeAsync(); + + // force the scheduler closing to happen outside the main thread + schedulerCloseLatch.await(); + CompletableFuture.runAsync( + () -> { + try { + // we don't want the future to be completed right away because that could + // lead to the subsequent calls being executed by the main thread again + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + schedulerCloseFuture.complete(null); + }); + + // wait for the ResourceManager connection to be closed (happens before closing the slot + // pool) + CommonTestUtils.waitUntilCondition(() -> closeSlotPoolLatch.getWaitersCount() > 0); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + jobMasterGateway.disconnectResourceManager( + resourceManagerGateway.getFencingToken(), + new FlinkException( + "The JobMaster triggered a connection shutdown which is confirmed by the ResourceManager through this disconnectResourceManager call.")); + + // unblocks JobMaster's close procedure + closeSlotPoolLatch.trigger(); + assertThatFuture(jobMasterCloseFuture).eventuallySucceeds(); + + assertThat(connectCount.get()) + .as( + "The disconnect shouldn't trigger another reconnect because the ResourceManager connection was already closed.") + .isEqualTo(1); + } + @Test void testRetrievingCheckpointStats() throws Exception { // create savepoint data diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java index aed1790d597..c99c8e91f12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java @@ -91,6 +91,11 @@ public class TestingSlotPoolServiceBuilder implements SlotPoolServiceFactory { return this; } + public TestingSlotPoolServiceBuilder setCloseRunnable(Runnable closeRunnable) { + this.closeRunnable = closeRunnable; + return this; + } + public static TestingSlotPoolServiceBuilder newBuilder() { return new TestingSlotPoolServiceBuilder(); }
