This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ca3798c  [FLINK-24232][coordination] Skip history server archiving for 
suspended jobs
ca3798c is described below

commit ca3798c6228fb8806a5954acfde098f060a2081b
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Dec 9 12:27:20 2021 +0100

    [FLINK-24232][coordination] Skip history server archiving for suspended jobs
    
    Do not create an archive for suspended jobs, as this would eventually lead 
to multiple archive attempts which we currently do not support.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 30 ++++----
 .../flink/runtime/dispatcher/DispatcherTest.java   | 83 +++++++++++++++++++++-
 2 files changed, 99 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4b5039b..39b74bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -866,19 +866,23 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                     e);
         }
 
-        final CompletableFuture<Acknowledge> executionGraphFuture =
-                
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-        executionGraphFuture.whenComplete(
-                (Acknowledge ignored, Throwable throwable) -> {
-                    if (throwable != null) {
-                        log.info(
-                                "Could not archive completed job {}({}) to the 
history server.",
-                                
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                throwable);
-                    }
-                });
+        // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
+        // archive attempts which we currently do not support
+        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
+            final CompletableFuture<Acknowledge> executionGraphFuture =
+                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
+
+            executionGraphFuture.whenComplete(
+                    (Acknowledge ignored, Throwable throwable) -> {
+                        if (throwable != null) {
+                            log.info(
+                                    "Could not archive completed job {}({}) to 
the history server.",
+                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                    throwable);
+                        }
+                    });
+        }
     }
 
     private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ba218d6..553706e 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -398,6 +398,46 @@ public class DispatcherTest extends TestLogger {
     }
 
     @Test
+    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws 
Exception {
+        final CompletableFuture<Void> archiveAttemptFuture = new 
CompletableFuture<>();
+        final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+                new CompletableFuture<>();
+        dispatcher =
+                new TestingDispatcherBuilder()
+                        .setJobManagerRunnerFactory(
+                                new FinishingJobManagerRunnerFactory(
+                                        jobTerminationFuture, () -> {}))
+                        .setHistoryServerArchivist(
+                                executionGraphInfo -> {
+                                    archiveAttemptFuture.complete(null);
+                                    return 
CompletableFuture.completedFuture(null);
+                                })
+                        .build();
+        dispatcher.start();
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        JobID jobId = jobGraph.getJobID();
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        jobTerminationFuture.complete(
+                JobManagerRunnerResult.forSuccess(
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setJobID(jobId)
+                                        .setState(JobStatus.SUSPENDED)
+                                        .build())));
+
+        // wait for job to finish
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        // sanity check
+        assertThat(
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.SUSPENDED));
+
+        assertThat(archiveAttemptFuture.isDone(), is(false));
+    }
+
+    @Test
     public void testJobManagerRunnerInitializationFailureFailsJob() throws 
Exception {
         final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
                 new TestingJobManagerRunnerFactory();
@@ -1166,6 +1206,8 @@ public class DispatcherTest extends TestLogger {
 
         private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
 
+        private HistoryServerArchivist historyServerArchivist = 
VoidHistoryServerArchivist.INSTANCE;
+
         TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices 
heartbeatServices) {
             this.heartbeatServices = heartbeatServices;
             return this;
@@ -1192,6 +1234,12 @@ public class DispatcherTest extends TestLogger {
             return this;
         }
 
+        public TestingDispatcherBuilder setHistoryServerArchivist(
+                HistoryServerArchivist historyServerArchivist) {
+            this.historyServerArchivist = historyServerArchivist;
+            return this;
+        }
+
         TestingDispatcher build() throws Exception {
             TestingResourceManagerGateway resourceManagerGateway =
                     new TestingResourceManagerGateway();
@@ -1212,7 +1260,7 @@ public class DispatcherTest extends TestLogger {
                             heartbeatServices,
                             executionGraphInfoStore,
                             
testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                            VoidHistoryServerArchivist.INSTANCE,
+                            historyServerArchivist,
                             null,
                             
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                             jobGraphWriter,
@@ -1301,6 +1349,39 @@ public class DispatcherTest extends TestLogger {
         }
     }
 
+    private static class FinishingJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
+
+        private final CompletableFuture<JobManagerRunnerResult> resultFuture;
+        private final Runnable onClose;
+
+        private FinishingJobManagerRunnerFactory(
+                CompletableFuture<JobManagerRunnerResult> resultFuture, 
Runnable onClose) {
+            this.resultFuture = resultFuture;
+            this.onClose = onClose;
+        }
+
+        @Override
+        public JobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerServices,
+                JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                long initializationTimestamp)
+                throws Exception {
+            final TestingJobManagerRunner runner =
+                    new TestingJobManagerRunner.Builder()
+                            .setJobId(jobGraph.getJobID())
+                            .setResultFuture(resultFuture)
+                            .build();
+            runner.getTerminationFuture().thenRun(onClose::run);
+            return runner;
+        }
+    }
+
     private static class BlockingJobVertex extends JobVertex {
         private final OneShotLatch oneShotLatch = new OneShotLatch();
 

Reply via email to