This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0f22734ae2 [FLINK-39133][runtime] Fix DispatcherTest to properly
handle async operations for applications
c0f22734ae2 is described below
commit c0f22734ae2f77827f01f714c2ca12732d03057c
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Feb 24 11:34:18 2026 +0800
[FLINK-39133][runtime] Fix DispatcherTest to properly handle async
operations for applications
---
.../flink/runtime/dispatcher/Dispatcher.java | 55 +++++++++++-----------
.../flink/runtime/dispatcher/DispatcherTest.java | 4 ++
2 files changed, 32 insertions(+), 27 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e4cee523047..b5f8cc26e19 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -709,29 +709,31 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
})
.collect(Collectors.toList());
- // wait for all jobs to be stored
- FutureUtils.combineAll(jobFutures)
- .thenAcceptAsync(
- combinedJobs -> {
- Map<JobID, ExecutionGraphInfo> jobs = new
HashMap<>();
- for (ExecutionGraphInfo executionGraphInfo :
combinedJobs) {
- jobs.put(executionGraphInfo.getJobId(),
executionGraphInfo);
- partialExecutionGraphInfoStore.remove(
- executionGraphInfo.getJobId());
- }
-
- ArchivedApplication archivedApplication =
- new ArchivedApplication(
- application.getApplicationId(),
- application.getName(),
-
application.getApplicationStatus(),
- stateTimestamps,
- jobs);
-
- applications.remove(applicationId);
-
writeToArchivedApplicationStore(archivedApplication);
- CompletableFuture<?>
applicationArchivingFuture =
- historyServerArchivist
+ // wait for all jobs to be stored, then archive the application
+ CompletableFuture<?> applicationArchivingFuture =
+ FutureUtils.combineAll(jobFutures)
+ .thenComposeAsync(
+ combinedJobs -> {
+ Map<JobID, ExecutionGraphInfo> jobs =
new HashMap<>();
+ for (ExecutionGraphInfo
executionGraphInfo : combinedJobs) {
+ jobs.put(
+
executionGraphInfo.getJobId(),
+ executionGraphInfo);
+
partialExecutionGraphInfoStore.remove(
+
executionGraphInfo.getJobId());
+ }
+
+ ArchivedApplication
archivedApplication =
+ new ArchivedApplication(
+
application.getApplicationId(),
+ application.getName(),
+
application.getApplicationStatus(),
+ stateTimestamps,
+ jobs);
+
+ applications.remove(applicationId);
+
writeToArchivedApplicationStore(archivedApplication);
+ return historyServerArchivist
.archiveApplication(archivedApplication)
.exceptionally(
throwable -> {
@@ -741,10 +743,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
throwable);
return null;
});
- applicationArchivingFutures.put(
- applicationId,
applicationArchivingFuture);
- },
- getMainThreadExecutor());
+ },
+ getMainThreadExecutor());
+ applicationArchivingFutures.put(applicationId,
applicationArchivingFuture);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d6a16166623..ea40b2e6e42 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -638,6 +638,10 @@ public class DispatcherTest extends AbstractDispatcherTest
{
dispatcher = createTestingDispatcherBuilder().build(rpcService);
dispatcher.start();
final ApplicationID applicationId =
mockApplicationStatusChange(ApplicationState.FINISHED);
+ // wait for archive to complete
+ dispatcher
+ .getApplicationArchivingFuture(applicationId)
+ .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
assertThrows(
IllegalStateException.class,
() ->