XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1668120064
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -85,7 +85,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
@Override
public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
if (isRegistered(jobId)) {
- return unregister(jobId).closeAsync();
+ CompletableFuture<Void> resultFuture =
this.jobManagerRunners.get(jobId).closeAsync();
+
+ resultFuture.whenComplete(
Review Comment:
What about using `thenAccept` here? ...since we're only interested in the
success scenario.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -85,7 +85,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
@Override
public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
if (isRegistered(jobId)) {
- return unregister(jobId).closeAsync();
+ CompletableFuture<Void> resultFuture =
this.jobManagerRunners.get(jobId).closeAsync();
+
+ resultFuture.whenComplete(
+ (result, throwable) -> {
+ if (throwable == null) {
+ unregister(jobId);
Review Comment:
Here, we have to be careful: The `closeAsync` is not necessarily called in
the `Dispatcher`'s main thread (let me know if you're unfamiliar with the main
thread concept in Flink).
Essentially, the idea is for certain components to run all the logic in a
single thread, i.e. the main thread, to avoid worrying about concurrency.
That's the case for the `Dispatcher` (as it implements the `RPCEndpoint`
interface) and all its sub-components. See
[OnMainThreadJobManagerRunnerRegistry](https://github.com/apache/flink/blob/3efd4c2cf1be670f499e6637445e283e48deee60/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java)
and its usages which ensures that we follow this paradigm for the
`JobManagerRunnerRegistry` component.
I guess we would have to utilize `thenAcceptAsync` on the `closeAsync`
future.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -164,7 +164,8 @@ void testFailingLocalCleanup() {
.hasExactlyElementsOfTypes(ExecutionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
-
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ // Since the cleanup failed, the JobManagerRunner is expected to not
have been unregistered.
Review Comment:
nit: you could use the `.as()` method of JUnit5 to add a assert message
rather than a comment. It would serve the same function as a comment but would
also expose the description programmatically.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]