XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1668120064


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -85,7 +85,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     @Override
     public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
         if (isRegistered(jobId)) {
-            return unregister(jobId).closeAsync();
+            CompletableFuture<Void> resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+            resultFuture.whenComplete(

Review Comment:
   What about using `thenAccept` here? ...since we're only interested in the 
success scenario.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -85,7 +85,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     @Override
     public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
         if (isRegistered(jobId)) {
-            return unregister(jobId).closeAsync();
+            CompletableFuture<Void> resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+            resultFuture.whenComplete(
+                    (result, throwable) -> {
+                        if (throwable == null) {
+                            unregister(jobId);

Review Comment:
   Here, we have to be careful: The `closeAsync` is not necessarily called in 
the `Dispatcher`'s main thread (let me know if you're unfamiliar with the main 
thread concept in Flink). 
   
   Essentially, the idea is for certain components to run all the logic in a 
single thread, i.e. the main thread, to avoid worrying about concurrency. 
That's the case for the `Dispatcher` (as it implements the `RPCEndpoint` 
interface) and all its sub-components. See 
[OnMainThreadJobManagerRunnerRegistry](https://github.com/apache/flink/blob/3efd4c2cf1be670f499e6637445e283e48deee60/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java)
 and its usages which ensures that we follow this paradigm for the 
`JobManagerRunnerRegistry` component.
   
   I guess we would have to utilize `thenAcceptAsync` on the `closeAsync` 
future.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -164,7 +164,8 @@ void testFailingLocalCleanup() {
                 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        // Since the cleanup failed, the JobManagerRunner is expected to not 
have been unregistered.

Review Comment:
   nit: you could use the `.as()` method of JUnit5 to add a assert message 
rather than a comment. It would serve the same function as a comment but would 
also expose the description programmatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to