This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0f22734ae2 [FLINK-39133][runtime] Fix DispatcherTest to properly 
handle async operations for applications
c0f22734ae2 is described below

commit c0f22734ae2f77827f01f714c2ca12732d03057c
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Feb 24 11:34:18 2026 +0800

    [FLINK-39133][runtime] Fix DispatcherTest to properly handle async 
operations for applications
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 55 +++++++++++-----------
 .../flink/runtime/dispatcher/DispatcherTest.java   |  4 ++
 2 files changed, 32 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e4cee523047..b5f8cc26e19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -709,29 +709,31 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                     })
                             .collect(Collectors.toList());
 
-            // wait for all jobs to be stored
-            FutureUtils.combineAll(jobFutures)
-                    .thenAcceptAsync(
-                            combinedJobs -> {
-                                Map<JobID, ExecutionGraphInfo> jobs = new 
HashMap<>();
-                                for (ExecutionGraphInfo executionGraphInfo : 
combinedJobs) {
-                                    jobs.put(executionGraphInfo.getJobId(), 
executionGraphInfo);
-                                    partialExecutionGraphInfoStore.remove(
-                                            executionGraphInfo.getJobId());
-                                }
-
-                                ArchivedApplication archivedApplication =
-                                        new ArchivedApplication(
-                                                application.getApplicationId(),
-                                                application.getName(),
-                                                
application.getApplicationStatus(),
-                                                stateTimestamps,
-                                                jobs);
-
-                                applications.remove(applicationId);
-                                
writeToArchivedApplicationStore(archivedApplication);
-                                CompletableFuture<?> 
applicationArchivingFuture =
-                                        historyServerArchivist
+            // wait for all jobs to be stored, then archive the application
+            CompletableFuture<?> applicationArchivingFuture =
+                    FutureUtils.combineAll(jobFutures)
+                            .thenComposeAsync(
+                                    combinedJobs -> {
+                                        Map<JobID, ExecutionGraphInfo> jobs = 
new HashMap<>();
+                                        for (ExecutionGraphInfo 
executionGraphInfo : combinedJobs) {
+                                            jobs.put(
+                                                    
executionGraphInfo.getJobId(),
+                                                    executionGraphInfo);
+                                            
partialExecutionGraphInfoStore.remove(
+                                                    
executionGraphInfo.getJobId());
+                                        }
+
+                                        ArchivedApplication 
archivedApplication =
+                                                new ArchivedApplication(
+                                                        
application.getApplicationId(),
+                                                        application.getName(),
+                                                        
application.getApplicationStatus(),
+                                                        stateTimestamps,
+                                                        jobs);
+
+                                        applications.remove(applicationId);
+                                        
writeToArchivedApplicationStore(archivedApplication);
+                                        return historyServerArchivist
                                                 
.archiveApplication(archivedApplication)
                                                 .exceptionally(
                                                         throwable -> {
@@ -741,10 +743,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                                     throwable);
                                                             return null;
                                                         });
-                                applicationArchivingFutures.put(
-                                        applicationId, 
applicationArchivingFuture);
-                            },
-                            getMainThreadExecutor());
+                                    },
+                                    getMainThreadExecutor());
+            applicationArchivingFutures.put(applicationId, 
applicationArchivingFuture);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d6a16166623..ea40b2e6e42 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -638,6 +638,10 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         dispatcher = createTestingDispatcherBuilder().build(rpcService);
         dispatcher.start();
         final ApplicationID applicationId = 
mockApplicationStatusChange(ApplicationState.FINISHED);
+        // wait for archive to complete
+        dispatcher
+                .getApplicationArchivingFuture(applicationId)
+                .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         assertThrows(
                 IllegalStateException.class,
                 () ->

Reply via email to