This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4e99d64b4914ca5fbb1fe338455a8f653c164172 Author: Matthias Pohl <[email protected]> AuthorDate: Mon Sep 12 11:35:44 2022 +0200 [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync localCleanupAsync is meant to be called from the Dispatcher's main thread. Any blocking calls should be avoided here. Instead, we could use closeAsync which is implemented by JobManagerRunner. Two additional tests needed to be touched because the previously used AutoCloseableAsync#close method added another FlinKException to the Stacktrace. --- .../DefaultJobManagerRunnerRegistry.java | 6 +--- .../DefaultJobManagerRunnerRegistryTest.java | 32 ++++++++++++++++------ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java index 16ab2cf985c..5c6cdfc3abb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java @@ -85,11 +85,7 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry @Override public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) { if (isRegistered(jobId)) { - try { - unregister(jobId).close(); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); - } + return unregister(jobId).closeAsync(); } return FutureUtils.completedVoidFuture(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java index 1d4cec95640..cce8ba21357 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.runtime.concurrent.UnsupportedOperationExecutor; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.concurrent.Executors; import org.junit.jupiter.api.BeforeEach; @@ -161,10 +161,7 @@ public class DefaultJobManagerRunnerRegistryTest { .failsWithin(Duration.ZERO) .withThrowableOfType(ExecutionException.class) .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) - .hasExactlyElementsOfTypes( - ExecutionException.class, - FlinkException.class, - expectedException.getClass()) + .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); @@ -200,14 +197,31 @@ public class DefaultJobManagerRunnerRegistryTest { .failsWithin(Duration.ZERO) .withThrowableOfType(ExecutionException.class) .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) - .hasExactlyElementsOfTypes( - ExecutionException.class, - FlinkException.class, - expectedException.getClass()) + .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); } + @Test + public void testLocalCleanupAsyncNonBlocking() { + final TestingJobManagerRunner jobManagerRunner = + TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build(); + testInstance.register(jobManagerRunner); + + // this call shouldn't block + final CompletableFuture<Void> cleanupFuture = + testInstance.localCleanupAsync( + jobManagerRunner.getJobID(), UnsupportedOperationExecutor.INSTANCE); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted(); + assertThat(cleanupFuture).isNotCompleted(); + + jobManagerRunner.getTerminationFuture().complete(null); + + assertThat(cleanupFuture).isCompleted(); + } + private TestingJobManagerRunner registerTestingJobManagerRunner() { final TestingJobManagerRunner jobManagerRunner = TestingJobManagerRunner.newBuilder().build();
