XComp commented on code in PR #24489:
URL: https://github.com/apache/flink/pull/24489#discussion_r1941688686


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
         }
     }
 
+    @Test
+    public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+            throws Exception {
+
+        // the ResourceManager should count the connect attempts
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createAndRegisterTestingResourceManagerGateway();
+        final AtomicInteger connectCount = new AtomicInteger();
+        final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+        resourceManagerGateway.setRegisterJobManagerFunction(
+                (jobMasterId, resourceID, s, jobID) -> {
+                    connectCount.incrementAndGet();
+                    firstRegistrationLatch.trigger();
+
+                    return CompletableFuture.completedFuture(
+                            
resourceManagerGateway.getJobMasterRegistrationSuccess());
+                });
+
+        // the scheduler close logic should be handled outside the main thread
+        final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> schedulerCloseFuture = new 
CompletableFuture<>();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    schedulerCloseLatch.trigger();
+                                    return schedulerCloseFuture;
+                                })
+                        .build();
+
+        final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+        final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder()
+                                                
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster();
+        jobMaster.start();
+
+        notifyResourceManagerLeaderListeners(resourceManagerGateway);
+        firstRegistrationLatch.await();
+
+        final CompletableFuture<Void> jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+        // force the scheduler closing to happen outside the main thread

Review Comment:
   I will keep the two PRs separately and merge them separately. It makes sense 
to keep the two things in two separate commits anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to