This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 069d629  [FLINK-24232][coordination] Skip history server archiving for 
suspended jobs
069d629 is described below

commit 069d629c53d4310bc1d45c278c2703452fab96d5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Dec 9 12:27:20 2021 +0100

    [FLINK-24232][coordination] Skip history server archiving for suspended jobs
    
    Do not create an archive for suspended jobs, as this would eventually lead 
to multiple archive attempts which we currently do not support.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 30 +++++++++-------
 .../runtime/dispatcher/AbstractDispatcherTest.java | 10 +++++-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 40 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9560612..a3049dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -887,19 +887,23 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                     e);
         }
 
-        final CompletableFuture<Acknowledge> executionGraphFuture =
-                
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-        executionGraphFuture.whenComplete(
-                (Acknowledge ignored, Throwable throwable) -> {
-                    if (throwable != null) {
-                        log.info(
-                                "Could not archive completed job {}({}) to the 
history server.",
-                                
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                throwable);
-                    }
-                });
+        // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
+        // archive attempts which we currently do not support
+        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
+            final CompletableFuture<Acknowledge> executionGraphFuture =
+                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
+
+            executionGraphFuture.whenComplete(
+                    (Acknowledge ignored, Throwable throwable) -> {
+                        if (throwable != null) {
+                            log.info(
+                                    "Could not archive completed job {}({}) to 
the history server.",
+                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                    throwable);
+                        }
+                    });
+        }
     }
 
     private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index f052588..c65d13d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -145,6 +145,8 @@ public class AbstractDispatcherTest extends TestLogger {
         private FatalErrorHandler fatalErrorHandler =
                 testingFatalErrorHandlerResource.getFatalErrorHandler();
 
+        private HistoryServerArchivist historyServerArchivist = 
VoidHistoryServerArchivist.INSTANCE;
+
         TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices 
heartbeatServices) {
             this.heartbeatServices = heartbeatServices;
             return this;
@@ -176,6 +178,12 @@ public class AbstractDispatcherTest extends TestLogger {
             return this;
         }
 
+        public TestingDispatcherBuilder setHistoryServerArchivist(
+                HistoryServerArchivist historyServerArchivist) {
+            this.historyServerArchivist = historyServerArchivist;
+            return this;
+        }
+
         TestingDispatcher build() throws Exception {
             TestingResourceManagerGateway resourceManagerGateway =
                     new TestingResourceManagerGateway();
@@ -196,7 +204,7 @@ public class AbstractDispatcherTest extends TestLogger {
                             heartbeatServices,
                             executionGraphInfoStore,
                             fatalErrorHandler,
-                            VoidHistoryServerArchivist.INSTANCE,
+                            historyServerArchivist,
                             null,
                             
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                             jobGraphWriter,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index fb36a9e..130be52 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -376,6 +376,46 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
     }
 
     @Test
+    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws 
Exception {
+        final CompletableFuture<Void> archiveAttemptFuture = new 
CompletableFuture<>();
+        final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+                new CompletableFuture<>();
+        dispatcher =
+                new TestingDispatcherBuilder()
+                        .setJobManagerRunnerFactory(
+                                new FinishingJobManagerRunnerFactory(
+                                        jobTerminationFuture, () -> {}))
+                        .setHistoryServerArchivist(
+                                executionGraphInfo -> {
+                                    archiveAttemptFuture.complete(null);
+                                    return 
CompletableFuture.completedFuture(null);
+                                })
+                        .build();
+        dispatcher.start();
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        JobID jobId = jobGraph.getJobID();
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        jobTerminationFuture.complete(
+                JobManagerRunnerResult.forSuccess(
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setJobID(jobId)
+                                        .setState(JobStatus.SUSPENDED)
+                                        .build())));
+
+        // wait for job to finish
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        // sanity check
+        assertThat(
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.SUSPENDED));
+
+        assertThat(archiveAttemptFuture.isDone(), is(false));
+    }
+
+    @Test
     public void testJobManagerRunnerInitializationFailureFailsJob() throws 
Exception {
         final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
                 new TestingJobManagerRunnerFactory();

Reply via email to