This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eef910f70988a12bb85022674763bbc35448ddf7
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Feb 6 11:35:51 2025 +0100

    [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown 
(#24489)
    
    * Makes stopping the job execution in JobMaster run in the main thread
    * Makes JobMaster#closeAsync more robust to disconnect calls during shutdown
    * Adds test to cover FLINK-34227 scenario
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 31 ++++++---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 81 ++++++++++++++++++++++
 .../slotpool/TestingSlotPoolServiceBuilder.java    |  5 ++
 3 files changed, 109 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6edaa00ef07..2205e324677 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -839,14 +839,29 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
     public void disconnectResourceManager(
             final ResourceManagerId resourceManagerId, final Exception cause) {
 
-        if (isConnectingToResourceManager(resourceManagerId)) {
+        if (resourceManagerAddress == null) {
+            log.debug(
+                    "Disconnecting ResourceManager {} was triggered with no 
ResourceManager address "
+                            + "being set (anymore). That either indicates that 
the ResourceManager "
+                            + "lost leadership in the mean time or the message 
was received while "
+                            + "shutting down the JobMaster. No reconnect will 
be initiated.",
+                    resourceManagerId);
+        } else if 
(!resourceManagerAddress.getResourceManagerId().equals(resourceManagerId)) {
+            log.debug(
+                    "Disconnecting ResourceManager {} was received while this 
instance is currently "
+                            + "connected to another ResourceManager {} 
indicating that a ResourceManager "
+                            + "leader change happened. No reconnect will be 
initiated.",
+                    resourceManagerId,
+                    resourceManagerAddress.getResourceManagerId());
+        } else {
             reconnectToResourceManager(cause);
         }
     }
 
-    private boolean isConnectingToResourceManager(ResourceManagerId 
resourceManagerId) {
-        return resourceManagerAddress != null
-                && 
resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
+    private void shutdownResourceManagerConnection(Exception cause) {
+        // unsetting the resourceManagerAddress will prevent reconnection
+        resourceManagerAddress = null;
+        closeResourceManagerConnection(cause);
     }
 
     @Override
@@ -1045,13 +1060,14 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
         final CompletableFuture<Void> terminationFuture = stopScheduling();
 
-        return FutureUtils.runAfterwards(
+        return FutureUtils.runAfterwardsAsync(
                 terminationFuture,
                 () -> {
                     shuffleMaster.unregisterJob(jobGraph.getJobID());
                     disconnectTaskManagerResourceManagerConnections(cause);
                     stopJobMasterServices();
-                });
+                },
+                getMainThreadExecutor());
     }
 
     private void disconnectTaskManagerResourceManagerConnections(Exception 
cause) {
@@ -1063,8 +1079,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
             disconnectTaskManager(taskManagerResourceId, cause);
         }
 
-        // disconnect from resource manager:
-        closeResourceManagerConnection(cause);
+        shutdownResourceManagerConnection(cause);
     }
 
     private void stopHeartbeatServices() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e8e63652283..2b5cc013f3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -160,6 +160,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -2099,6 +2100,86 @@ class JobMasterTest {
         }
     }
 
+    @Test
+    public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+            throws Exception {
+
+        // the ResourceManager should count the connect attempts
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createAndRegisterTestingResourceManagerGateway();
+        final AtomicInteger connectCount = new AtomicInteger();
+        final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+        resourceManagerGateway.setRegisterJobManagerFunction(
+                (jobMasterId, resourceID, s, jobID) -> {
+                    connectCount.incrementAndGet();
+                    firstRegistrationLatch.trigger();
+
+                    return CompletableFuture.completedFuture(
+                            
resourceManagerGateway.getJobMasterRegistrationSuccess());
+                });
+
+        final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> schedulerCloseFuture = new 
CompletableFuture<>();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    schedulerCloseLatch.trigger();
+                                    return schedulerCloseFuture;
+                                })
+                        .build();
+
+        final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+        final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder()
+                                                
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster();
+        jobMaster.start();
+
+        notifyResourceManagerLeaderListeners(resourceManagerGateway);
+        firstRegistrationLatch.await();
+
+        final CompletableFuture<Void> jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+        // force the scheduler closing to happen outside the main thread
+        schedulerCloseLatch.await();
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        // we don't want the future to be completed right away 
because that could
+                        // lead to the subsequent calls being executed by the 
main thread again
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    schedulerCloseFuture.complete(null);
+                });
+
+        // wait for the ResourceManager connection to be closed (happens 
before closing the slot
+        // pool)
+        CommonTestUtils.waitUntilCondition(() -> 
closeSlotPoolLatch.getWaitersCount() > 0);
+
+        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+        jobMasterGateway.disconnectResourceManager(
+                resourceManagerGateway.getFencingToken(),
+                new FlinkException(
+                        "The JobMaster triggered a connection shutdown which 
is confirmed by the ResourceManager through this disconnectResourceManager 
call."));
+
+        // unblocks JobMaster's close procedure
+        closeSlotPoolLatch.trigger();
+        assertThatFuture(jobMasterCloseFuture).eventuallySucceeds();
+
+        assertThat(connectCount.get())
+                .as(
+                        "The disconnect shouldn't trigger another reconnect 
because the ResourceManager connection was already closed.")
+                .isEqualTo(1);
+    }
+
     @Test
     void testRetrievingCheckpointStats() throws Exception {
         // create savepoint data
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
index aed1790d597..c99c8e91f12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
@@ -91,6 +91,11 @@ public class TestingSlotPoolServiceBuilder implements 
SlotPoolServiceFactory {
         return this;
     }
 
+    public TestingSlotPoolServiceBuilder setCloseRunnable(Runnable 
closeRunnable) {
+        this.closeRunnable = closeRunnable;
+        return this;
+    }
+
     public static TestingSlotPoolServiceBuilder newBuilder() {
         return new TestingSlotPoolServiceBuilder();
     }

Reply via email to