This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 776ccfa27d2 [FLINK-29253][runtime] Removes synchronous close call from
DefaultJobManagerRunnerRegistry#localCleanupAsync
776ccfa27d2 is described below
commit 776ccfa27d2d29b1ccd674878eeccee1649ee938
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Sep 12 11:35:44 2022 +0200
[FLINK-29253][runtime] Removes synchronous close call from
DefaultJobManagerRunnerRegistry#localCleanupAsync
localCleanupAsync is meant to be called from the Dispatcher's main thread.
Any blocking
calls should be avoided here. Instead, we could use closeAsync which is
implemented by
JobManagerRunner.
Two additional tests needed to be touched because the previously used
AutoCloseableAsync#close method added another FlinKException to the
Stacktrace.
---
.../DefaultJobManagerRunnerRegistry.java | 6 +---
.../DefaultJobManagerRunnerRegistryTest.java | 32 ++++++++++++++++------
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
index 16ab2cf985c..5c6cdfc3abb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -85,11 +85,7 @@ public class DefaultJobManagerRunnerRegistry implements
JobManagerRunnerRegistry
@Override
public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
if (isRegistered(jobId)) {
- try {
- unregister(jobId).close();
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
- }
+ return unregister(jobId).closeAsync();
}
return FutureUtils.completedVoidFuture();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index 1d4cec95640..cce8ba21357 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.concurrent.UnsupportedOperationExecutor;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.BeforeEach;
@@ -161,10 +161,7 @@ public class DefaultJobManagerRunnerRegistryTest {
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
- .hasExactlyElementsOfTypes(
- ExecutionException.class,
- FlinkException.class,
- expectedException.getClass())
+ .hasExactlyElementsOfTypes(ExecutionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
@@ -200,14 +197,31 @@ public class DefaultJobManagerRunnerRegistryTest {
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
- .hasExactlyElementsOfTypes(
- ExecutionException.class,
- FlinkException.class,
- expectedException.getClass())
+ .hasExactlyElementsOfTypes(ExecutionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
}
+ @Test
+ public void testLocalCleanupAsyncNonBlocking() {
+ final TestingJobManagerRunner jobManagerRunner =
+
TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
+ testInstance.register(jobManagerRunner);
+
+ // this call shouldn't block
+ final CompletableFuture<Void> cleanupFuture =
+ testInstance.localCleanupAsync(
+ jobManagerRunner.getJobID(),
UnsupportedOperationExecutor.INSTANCE);
+
+
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+ assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted();
+ assertThat(cleanupFuture).isNotCompleted();
+
+ jobManagerRunner.getTerminationFuture().complete(null);
+
+ assertThat(cleanupFuture).isCompleted();
+ }
+
private TestingJobManagerRunner registerTestingJobManagerRunner() {
final TestingJobManagerRunner jobManagerRunner =
TestingJobManagerRunner.newBuilder().build();