This is an automated email from the ASF dual-hosted git repository.
junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b0607a15e62 [FLINK-37697][runtime] Fix application mode cluster shut
down before all job termination futures complete
b0607a15e62 is described below
commit b0607a15e62b664d15efbda0b0e991f72e45a467
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Apr 15 18:51:46 2025 +0800
[FLINK-37697][runtime] Fix application mode cluster shut down before all
job termination futures complete
---
.../flink/runtime/dispatcher/Dispatcher.java | 19 +++++++++++++++---
.../flink/runtime/dispatcher/DispatcherTest.java | 23 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5c9930be711..13a10850ee4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1093,13 +1093,25 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
@Override
public CompletableFuture<Acknowledge> shutDownCluster() {
- return shutDownCluster(ApplicationStatus.SUCCEEDED);
+ return internalShutDownCluster(ApplicationStatus.SUCCEEDED, false);
}
@Override
public CompletableFuture<Acknowledge> shutDownCluster(
final ApplicationStatus applicationStatus) {
- shutDownFuture.complete(applicationStatus);
+ return internalShutDownCluster(applicationStatus, true);
+ }
+
+ private CompletableFuture<Acknowledge> internalShutDownCluster(
+ final ApplicationStatus applicationStatus,
+ final boolean waitForAllJobTerminationFutures) {
+ final CompletableFuture<Void> allJobsTerminationFuture =
+ waitForAllJobTerminationFutures
+ ?
FutureUtils.completeAll(jobManagerRunnerTerminationFutures.values())
+ : CompletableFuture.completedFuture(null);
+
+ FutureUtils.runAfterwards(
+ allJobsTerminationFuture, () ->
shutDownFuture.complete(applicationStatus));
return CompletableFuture.completedFuture(Acknowledge.get());
}
@@ -1256,7 +1268,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
}
- private void registerJobManagerRunnerTerminationFuture(
+ @VisibleForTesting
+ void registerJobManagerRunnerTerminationFuture(
JobID jobId, CompletableFuture<Void>
jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
jobManagerRunnerTerminationFutures.put(jobId,
jobManagerRunnerTerminationFuture);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index fd1931b6c84..a6325b252ff 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -1041,6 +1041,29 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.getShutDownFuture().get();
}
+ @Test
+ public void testShutDownFutureCompletesAfterJobTerminationFutures() throws
Exception {
+ dispatcher =
+ createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ JobMasterServiceLeadershipRunnerFactory.INSTANCE);
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ CompletableFuture<Void> jobTerminationFuture = new
CompletableFuture<>();
+ dispatcher.registerJobManagerRunnerTerminationFuture(new JobID(),
jobTerminationFuture);
+
+ dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED).get();
+
+ assertThatThrownBy(() -> dispatcher.getShutDownFuture().get(10L,
TimeUnit.MILLISECONDS))
+ .isInstanceOf(TimeoutException.class);
+
+ jobTerminationFuture.complete(null);
+
+ dispatcher.getShutDownFuture().get();
+ }
+
@Test
public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
final CompletableFuture<JobID> removeJobGraphFuture = new
CompletableFuture<>();