This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 069d629 [FLINK-24232][coordination] Skip history server archiving for
suspended jobs
069d629 is described below
commit 069d629c53d4310bc1d45c278c2703452fab96d5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Dec 9 12:27:20 2021 +0100
[FLINK-24232][coordination] Skip history server archiving for suspended jobs
Do not create an archive for suspended jobs, as this would eventually lead
to multiple archive attempts which we currently do not support.
---
.../flink/runtime/dispatcher/Dispatcher.java | 30 +++++++++-------
.../runtime/dispatcher/AbstractDispatcherTest.java | 10 +++++-
.../flink/runtime/dispatcher/DispatcherTest.java | 40 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9560612..a3049dc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -887,19 +887,23 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
e);
}
- final CompletableFuture<Acknowledge> executionGraphFuture =
-
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
- executionGraphFuture.whenComplete(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- log.info(
- "Could not archive completed job {}({}) to the
history server.",
-
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
- throwable);
- }
- });
+ // do not create an archive for suspended jobs, as this would
eventually lead to multiple
+ // archive attempts which we currently do not support
+ if
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
{
+ final CompletableFuture<Acknowledge> executionGraphFuture =
+
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
+
+ executionGraphFuture.whenComplete(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job {}({}) to
the history server.",
+
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ });
+ }
}
private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index f052588..c65d13d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -145,6 +145,8 @@ public class AbstractDispatcherTest extends TestLogger {
private FatalErrorHandler fatalErrorHandler =
testingFatalErrorHandlerResource.getFatalErrorHandler();
+ private HistoryServerArchivist historyServerArchivist =
VoidHistoryServerArchivist.INSTANCE;
+
TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices
heartbeatServices) {
this.heartbeatServices = heartbeatServices;
return this;
@@ -176,6 +178,12 @@ public class AbstractDispatcherTest extends TestLogger {
return this;
}
+ public TestingDispatcherBuilder setHistoryServerArchivist(
+ HistoryServerArchivist historyServerArchivist) {
+ this.historyServerArchivist = historyServerArchivist;
+ return this;
+ }
+
TestingDispatcher build() throws Exception {
TestingResourceManagerGateway resourceManagerGateway =
new TestingResourceManagerGateway();
@@ -196,7 +204,7 @@ public class AbstractDispatcherTest extends TestLogger {
heartbeatServices,
executionGraphInfoStore,
fatalErrorHandler,
- VoidHistoryServerArchivist.INSTANCE,
+ historyServerArchivist,
null,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
jobGraphWriter,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index fb36a9e..130be52 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -376,6 +376,46 @@ public class DispatcherTest extends AbstractDispatcherTest
{
}
@Test
+ public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws
Exception {
+ final CompletableFuture<Void> archiveAttemptFuture = new
CompletableFuture<>();
+ final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+ new CompletableFuture<>();
+ dispatcher =
+ new TestingDispatcherBuilder()
+ .setJobManagerRunnerFactory(
+ new FinishingJobManagerRunnerFactory(
+ jobTerminationFuture, () -> {}))
+ .setHistoryServerArchivist(
+ executionGraphInfo -> {
+ archiveAttemptFuture.complete(null);
+ return
CompletableFuture.completedFuture(null);
+ })
+ .build();
+ dispatcher.start();
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ JobID jobId = jobGraph.getJobID();
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ jobTerminationFuture.complete(
+ JobManagerRunnerResult.forSuccess(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.SUSPENDED)
+ .build())));
+
+ // wait for job to finish
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+ // sanity check
+ assertThat(
+ dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
is(JobStatus.SUSPENDED));
+
+ assertThat(archiveAttemptFuture.isDone(), is(false));
+ }
+
+ @Test
public void testJobManagerRunnerInitializationFailureFailsJob() throws
Exception {
final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
new TestingJobManagerRunnerFactory();