XComp commented on code in PR #24489:
URL: https://github.com/apache/flink/pull/24489#discussion_r1936944636


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -2330,6 +2331,86 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
         }
     }
 
+    @Test
+    public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+            throws Exception {
+
+        // the ResourceManager should count the connect attempts
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createAndRegisterTestingResourceManagerGateway();
+        final AtomicInteger connectCount = new AtomicInteger();
+        final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+        resourceManagerGateway.setRegisterJobManagerFunction(
+                (jobMasterId, resourceID, s, jobID) -> {
+                    connectCount.incrementAndGet();
+                    firstRegistrationLatch.trigger();
+
+                    return CompletableFuture.completedFuture(
+                            
resourceManagerGateway.getJobMasterRegistrationSuccess());
+                });
+
+        final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> schedulerCloseFuture = new 
CompletableFuture<>();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    schedulerCloseLatch.trigger();
+                                    return schedulerCloseFuture;
+                                })
+                        .build();
+
+        final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+        final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder()
+                                                
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster();
+        jobMaster.start();
+
+        notifyResourceManagerLeaderListeners(resourceManagerGateway);
+        firstRegistrationLatch.await();
+
+        final CompletableFuture<Void> jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+        // force the scheduler closing to happen outside the main thread
+        schedulerCloseLatch.await();
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        // we don't want the future to be completed right away 
because that could
+                        // lead to the subsequent calls being executed by the 
main thread again
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    schedulerCloseFuture.complete(null);
+                });
+
+        // wait for the ResourceManager connection to be closed (happens 
before closing the slot
+        // pool)
+        CommonTestUtils.waitUntilCondition(() -> 
closeSlotPoolLatch.getWaitersCount() > 0);
+
+        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+        jobMasterGateway.disconnectResourceManager(
+                resourceManagerGateway.getFencingToken(),
+                new FlinkException(
+                        "The JobMaster triggered a connection shutdown which 
is confirmed by the ResourceManager through this disconnectResourceManager 
call."));

Review Comment:
   ```suggestion
                           "The JobMaster triggered a connection shutdown which 
is confirmed by the ResouceManager through this disconnectResourceManager 
call."));
   ```
   
   I went over the change once more to check whether the issue actually is 
reasonable which it is. The newly added test fails without any other change. 
Each of the other two commits fix the test individually. But I left both 
commits in the PR to harden the implementation.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
         }
     }
 
+    @Test
+    public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+            throws Exception {
+
+        // the ResourceManager should count the connect attempts
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createAndRegisterTestingResourceManagerGateway();
+        final AtomicInteger connectCount = new AtomicInteger();
+        final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+        resourceManagerGateway.setRegisterJobManagerFunction(
+                (jobMasterId, resourceID, s, jobID) -> {
+                    connectCount.incrementAndGet();
+                    firstRegistrationLatch.trigger();
+
+                    return CompletableFuture.completedFuture(
+                            
resourceManagerGateway.getJobMasterRegistrationSuccess());
+                });
+
+        // the scheduler close logic should be handled outside the main thread
+        final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> schedulerCloseFuture = new 
CompletableFuture<>();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    schedulerCloseLatch.trigger();
+                                    return schedulerCloseFuture;
+                                })
+                        .build();
+
+        final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+        final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder()
+                                                
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster();
+        jobMaster.start();
+
+        notifyResourceManagerLeaderListeners(resourceManagerGateway);
+        firstRegistrationLatch.await();
+
+        final CompletableFuture<Void> jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+        // force the scheduler closing to happen outside the main thread

Review Comment:
   I guess that's reasonable: I created 
https://github.com/apache/flink/pull/26095/files to fix the issue in the 
`*Scheduler` implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to