This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2fe318b9141 [FLINK-38761][runtime] Support archiving applications
2fe318b9141 is described below
commit 2fe318b91414acb792e4a0e2fd3ac593e1046dcc
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Oct 21 11:55:12 2025 +0800
[FLINK-38761][runtime] Support archiving applications
---
.../generated/cluster_configuration.html | 6 +
.../generated/expert_cluster_section.html | 6 +
.../application/ApplicationJobUtils.java | 45 +++-
.../application/ApplicationJobUtilsTest.java | 263 +++++++++++++++++++--
.../apache/flink/configuration/ClusterOptions.java | 19 ++
.../runtime/webmonitor/history/HistoryServer.java | 4 +-
.../history/HistoryServerArchiveFetcher.java | 4 +-
.../webmonitor/history/HistoryServerTest.java | 8 +-
.../flink/runtime/dispatcher/Dispatcher.java | 35 ++-
.../runtime/dispatcher/HistoryServerArchivist.java | 27 ++-
.../JsonResponseHistoryServerArchivist.java | 57 ++++-
.../dispatcher/VoidHistoryServerArchivist.java | 13 +-
...tDispatcherResourceManagerComponentFactory.java | 2 +-
.../executiongraph/AccessExecutionGraph.java | 8 +
.../executiongraph/ArchivedExecutionGraph.java | 18 +-
.../executiongraph/DefaultExecutionGraph.java | 6 +
.../DefaultExecutionGraphBuilder.java | 3 +-
.../runtime/executiongraph/JobInformation.java | 33 +++
.../flink/runtime/history/ArchivePathUtils.java | 64 +++++
.../{FsJobArchivist.java => FsJsonArchivist.java} | 62 ++---
.../application/ApplicationDetailsHandler.java | 23 +-
.../application/ApplicationsOverviewHandler.java | 22 +-
.../runtime/scheduler/ExecutionGraphInfo.java | 6 +
.../runtime/webmonitor/WebMonitorEndpoint.java | 36 ++-
.../history/ApplicationJsonArchivist.java | 46 ++++
.../runtime/webmonitor/history/ArchivedJson.java | 4 +-
.../dispatcher/DispatcherResourceCleanupTest.java | 19 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 123 +++++++++-
.../dispatcher/TestingHistoryServerArchivist.java | 95 ++++++++
.../runtime/history/ArchivePathUtilsTest.java | 86 +++++++
.../flink/runtime/history/FsJsonArchivistTest.java | 18 +-
.../utils/ArchivedExecutionGraphBuilder.java | 10 +-
.../adaptive/StateTrackingMockExecutionGraph.java | 6 +
33 files changed, 1043 insertions(+), 134 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/cluster_configuration.html
b/docs/layouts/shortcodes/generated/cluster_configuration.html
index a54c597c5c9..1d1dbd87d9c 100644
--- a/docs/layouts/shortcodes/generated/cluster_configuration.html
+++ b/docs/layouts/shortcodes/generated/cluster_configuration.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>cluster.id</h5></td>
+ <td style="word-wrap:
break-word;">"00000000000000000000000000000000"</td>
+ <td>String</td>
+ <td>The ID of the Flink cluster, used to separate multiple Flink
clusters from each other. Currently, this option is primarily used to determine
the archive path, and may also be used as the fixed application/job id (if not
specified by the user) in high availability mode.The expected format is
[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.<br />If this option is
not configured but <code
class="highlighter-rouge">high-availability.cluster-id</code> is configured,
the [...]
+ </tr>
<tr>
<td><h5>cluster.intercept-user-system-exit</h5></td>
<td style="word-wrap: break-word;">DISABLED</td>
diff --git a/docs/layouts/shortcodes/generated/expert_cluster_section.html
b/docs/layouts/shortcodes/generated/expert_cluster_section.html
index 63df9172b07..f32e45d44d0 100644
--- a/docs/layouts/shortcodes/generated/expert_cluster_section.html
+++ b/docs/layouts/shortcodes/generated/expert_cluster_section.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>cluster.id</h5></td>
+ <td style="word-wrap:
break-word;">"00000000000000000000000000000000"</td>
+ <td>String</td>
+ <td>The ID of the Flink cluster, used to separate multiple Flink
clusters from each other. Currently, this option is primarily used to determine
the archive path, and may also be used as the fixed application/job id (if not
specified by the user) in high availability mode.The expected format is
[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.<br />If this option is
not configured but <code
class="highlighter-rouge">high-availability.cluster-id</code> is configured,
the [...]
+ </tr>
<tr>
<td><h5>cluster.intercept-user-system-exit</h5></td>
<td style="word-wrap: break-word;">DISABLED</td>
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
index ac2b565c917..bf8f0415390 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
@@ -21,10 +21,12 @@ package org.apache.flink.client.deployment.application;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import java.util.Optional;
@@ -51,7 +53,24 @@ public class ApplicationJobUtils {
* @param configuration The configuration the may be updated with fixed IDs
*/
public static void maybeFixIds(Configuration configuration) {
+ if (configuration.getOptional(ClusterOptions.CLUSTER_ID).isEmpty()
+ &&
configuration.getOptional(HighAvailabilityOptions.HA_CLUSTER_ID).isPresent()) {
+ // The CLUSTER_ID will fall back to the HA_CLUSTER_ID. If the user
has already
+ // configured HA_CLUSTER_ID, it can avoid conflicts caused by the
missing CLUSTER_ID.
+ configuration.set(
+ ClusterOptions.CLUSTER_ID,
+ new AbstractID(
+ configuration
+
.get(HighAvailabilityOptions.HA_CLUSTER_ID)
+ .hashCode(),
+ 0)
+ .toHexString());
+ }
+ checkClusterId(configuration.get(ClusterOptions.CLUSTER_ID));
+
if
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
+ final Optional<String> configuredClusterId =
+ configuration.getOptional(ClusterOptions.CLUSTER_ID);
final Optional<String> configuredApplicationId =
configuration.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID);
if (configuredApplicationId.isEmpty()) {
@@ -59,24 +78,18 @@ public class ApplicationJobUtils {
// failovers. The application id is derived from the cluster
id.
configuration.set(
ApplicationOptionsInternal.FIXED_APPLICATION_ID,
- new ApplicationID(
- Preconditions.checkNotNull(
- configuration.get(
-
HighAvailabilityOptions
-
.HA_CLUSTER_ID))
- .hashCode(),
- 0)
- .toHexString());
+ configuration.get(ClusterOptions.CLUSTER_ID));
}
final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
if (configuredJobId.isEmpty()) {
// In HA mode, a fixed job id is required to ensure
consistency across failovers.
// The job id is derived as follows:
- // 1. If application id is configured, use the application id
as the job id.
+ // 1. If either cluster id or application id is configured,
use the application id
+ // as the job id.
// 2. Otherwise, generate the job id based on the HA cluster
id.
// Note that the second case is kept for backward
compatibility and may be removed.
- if (configuredApplicationId.isPresent()) {
+ if (configuredClusterId.isPresent() ||
configuredApplicationId.isPresent()) {
ApplicationID applicationId =
ApplicationID.fromHexString(
configuration.get(
@@ -100,6 +113,18 @@ public class ApplicationJobUtils {
}
}
+ private static String checkClusterId(String str) {
+ try {
+ ApplicationID.fromHexString(str);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid cluster id \""
+ + str
+ + "\". The expected format is [0-9a-fA-F]{32},
e.g. fd72014d4c864993a2e5a9287b4a9c5d.");
+ }
+ return str;
+ }
+
public static boolean allowExecuteMultipleJobs(Configuration config) {
final Optional<String> configuredJobId =
config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
index 958850c9d41..1eed3295981 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment.application;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -39,12 +40,14 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link ApplicationJobUtils}. */
public class ApplicationJobUtilsTest {
private static final String TEST_HA_CLUSTER_ID = "cluster";
+ private static final String TEST_CLUSTER_ID =
"3dc42a26ed5afedb8c6e1132809dcf73";
private static final String TEST_APPLICATION_ID =
"ca0eb040022fbccd4cf05d1e274ae25e";
private static final String TEST_JOB_ID =
"e79b6d171acd4baa6f421e3631168810";
@@ -60,8 +63,10 @@ public class ApplicationJobUtilsTest {
void testMaybeFixIds(
boolean isHAEnabled,
boolean isHaClusterIdSet,
+ boolean isClusterIdSet,
boolean isApplicationIdSet,
boolean isJobIdSet,
+ @Nullable String expectedClusterId,
@Nullable String expectedApplicationId,
@Nullable String expectedJobId) {
if (isHAEnabled) {
@@ -71,6 +76,9 @@ public class ApplicationJobUtilsTest {
if (isHaClusterIdSet) {
configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID,
TEST_HA_CLUSTER_ID);
}
+ if (isClusterIdSet) {
+ configuration.set(ClusterOptions.CLUSTER_ID, TEST_CLUSTER_ID);
+ }
if (isApplicationIdSet) {
configuration.set(ApplicationOptionsInternal.FIXED_APPLICATION_ID,
TEST_APPLICATION_ID);
}
@@ -80,67 +88,284 @@ public class ApplicationJobUtilsTest {
ApplicationJobUtils.maybeFixIds(configuration);
+ assertEquals(expectedClusterId,
configuration.get(ClusterOptions.CLUSTER_ID));
assertEquals(
expectedApplicationId,
configuration.get(ApplicationOptionsInternal.FIXED_APPLICATION_ID));
-
assertEquals(
expectedJobId,
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
}
private static Stream<Arguments> provideParametersForMaybeFixIds() {
- // all combinations for the four input: (isHAEnabled,
isHaClusterIdSet, isApplicationIdSet,
- // isJobIdSet)
+ // all combinations for the five input: (isHAEnabled,
isHaClusterIdSet, isClusterIdSet,
+ // isApplicationIdSet, isJobIdSet)
return Stream.of(
- Arguments.of(false, false, false, false, null, null),
- Arguments.of(false, false, false, true, null, TEST_JOB_ID),
- Arguments.of(false, false, true, false, TEST_APPLICATION_ID,
null),
- Arguments.of(false, false, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
- Arguments.of(false, true, false, false, null, null),
- Arguments.of(false, true, false, true, null, TEST_JOB_ID),
- Arguments.of(false, true, true, false, TEST_APPLICATION_ID,
null),
- Arguments.of(false, true, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
Arguments.of(
+ false,
+ false,
+ false,
+ false,
+ false,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ null,
+ null),
+ Arguments.of(
+ false,
+ false,
+ false,
+ false,
+ true,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ null,
+ TEST_JOB_ID),
+ Arguments.of(
+ false,
+ false,
+ false,
true,
false,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ TEST_APPLICATION_ID,
+ null),
+ Arguments.of(
false,
false,
- getAbstractIdFromString(
-
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+ false,
+ true,
+ true,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(false, false, true, false, false,
TEST_CLUSTER_ID, null, null),
+ Arguments.of(false, false, true, false, true, TEST_CLUSTER_ID,
null, TEST_JOB_ID),
+ Arguments.of(
+ false,
+ false,
+ true,
+ true,
+ false,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ null),
+ Arguments.of(
+ false,
+ false,
+ true,
+ true,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ false,
+ true,
+ false,
+ false,
+ false,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ null,
+ null),
+ Arguments.of(
+ false,
+ true,
+ false,
+ false,
+ true,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ null,
+ TEST_JOB_ID),
+ Arguments.of(
+ false,
+ true,
+ false,
+ true,
+ false,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ TEST_APPLICATION_ID,
+ null),
+ Arguments.of(
+ false,
+ true,
+ false,
+ true,
+ true,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(false, true, true, false, false, TEST_CLUSTER_ID,
null, null),
+ Arguments.of(false, true, true, false, true, TEST_CLUSTER_ID,
null, TEST_JOB_ID),
+ Arguments.of(
+ false, true, true, true, false, TEST_CLUSTER_ID,
TEST_APPLICATION_ID, null),
+ Arguments.of(
+ false,
+ true,
+ true,
+ true,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ false,
+ false,
+ false,
+ false,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ ClusterOptions.CLUSTER_ID.defaultValue(),
getAbstractIdFromString(
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue())),
Arguments.of(
true,
false,
false,
+ false,
true,
- getAbstractIdFromString(
-
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ ClusterOptions.CLUSTER_ID.defaultValue(),
TEST_JOB_ID),
- Arguments.of(true, false, true, false, TEST_APPLICATION_ID,
TEST_APPLICATION_ID),
- Arguments.of(true, false, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
Arguments.of(
true,
+ false,
+ false,
+ true,
+ false,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ TEST_APPLICATION_ID,
+ TEST_APPLICATION_ID),
+ Arguments.of(
+ true,
+ false,
+ false,
+ true,
+ true,
+ ClusterOptions.CLUSTER_ID.defaultValue(),
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ false,
true,
false,
false,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID),
+ Arguments.of(
+ true,
+ false,
+ true,
+ false,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ false,
+ true,
+ true,
+ false,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_APPLICATION_ID),
+ Arguments.of(
+ true,
+ false,
+ true,
+ true,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ true,
+ false,
+ false,
+ false,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
getAbstractIdFromString(TEST_HA_CLUSTER_ID),
getAbstractIdFromString(TEST_HA_CLUSTER_ID)),
Arguments.of(
true,
true,
false,
+ false,
true,
getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
TEST_JOB_ID),
- Arguments.of(true, true, true, false, TEST_APPLICATION_ID,
TEST_APPLICATION_ID),
- Arguments.of(true, true, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID));
+ Arguments.of(
+ true,
+ true,
+ false,
+ true,
+ false,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ TEST_APPLICATION_ID,
+ TEST_APPLICATION_ID),
+ Arguments.of(
+ true,
+ true,
+ false,
+ true,
+ true,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ true,
+ true,
+ false,
+ false,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID),
+ Arguments.of(
+ true,
+ true,
+ true,
+ false,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_CLUSTER_ID,
+ TEST_JOB_ID),
+ Arguments.of(
+ true,
+ true,
+ true,
+ true,
+ false,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_APPLICATION_ID),
+ Arguments.of(
+ true,
+ true,
+ true,
+ true,
+ true,
+ TEST_CLUSTER_ID,
+ TEST_APPLICATION_ID,
+ TEST_JOB_ID));
}
private static String getAbstractIdFromString(String str) {
return (new AbstractID(str.hashCode(), 0)).toHexString();
}
+ @Test
+ void testMaybeFixIds_ClusterIdMalformed() {
+ final String clusterId = "cluster";
+ configuration.set(ClusterOptions.CLUSTER_ID, clusterId);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> ApplicationJobUtils.maybeFixIds(configuration));
+ }
+
@Test
void testAllowExecuteMultipleJobs_HADisabled_NoFixedJobId() {
assertEquals(
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 1cc4869d990..4957f486a08 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -36,6 +36,25 @@ import static
org.apache.flink.configuration.description.TextElement.text;
@PublicEvolving
public class ClusterOptions {
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String> CLUSTER_ID =
+ key("cluster.id")
+ .stringType()
+ .defaultValue("00000000000000000000000000000000")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The ID of the Flink cluster, used
to separate multiple Flink clusters from each other. "
+ + "Currently, this option
is primarily used to determine the archive path, "
+ + "and may also be used as
the fixed application/job id (if not specified by the user) in high
availability mode."
+ + "The expected format is
[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.")
+ .linebreak()
+ .text(
+ "If this option is not configured
but %s is configured, the value will be derived from the value of %s.",
+
code(HighAvailabilityOptions.HA_CLUSTER_ID.key()),
+
code(HighAvailabilityOptions.HA_CLUSTER_ID.key()))
+ .build());
+
@Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
public static final ConfigOption<Duration> INITIAL_REGISTRATION_TIMEOUT =
ConfigOptions.key("cluster.registration.initial-timeout")
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 51c5a067598..69d14e7fecb 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler;
@@ -81,7 +81,7 @@ import java.util.function.Consumer;
* jobs for which the JobManager may have already shut down.
*
* <p>The HistoryServer regularly checks a set of directories for job archives
created by the {@link
- * FsJobArchivist} and caches these in a local directory. See {@link
HistoryServerArchiveFetcher}.
+ * FsJsonArchivist} and caches these in a local directory. See {@link
HistoryServerArchiveFetcher}.
*
* <p>All configuration options are defined in{@link HistoryServerOptions}.
*
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 78452c22179..4fe8bd58d5b 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
@@ -263,7 +263,7 @@ class HistoryServerArchiveFetcher {
}
private void processArchive(String jobID, Path jobArchive) throws
IOException {
- for (ArchivedJson archive :
FsJobArchivist.getArchivedJsons(jobArchive)) {
+ for (ArchivedJson archive :
FsJsonArchivist.readArchivedJsons(jobArchive)) {
String path = archive.getPath();
String json = archive.getJson();
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9ab2bb95f8f..537d58fae8e 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
@@ -499,9 +499,9 @@ class HistoryServerTest {
}
String json = sw.toString();
ArchivedJson archivedJson = new ArchivedJson("/joboverview", json);
- FsJobArchivist.archiveJob(
- new org.apache.flink.core.fs.Path(directory.toUri()),
- jobId,
+ FsJsonArchivist.writeArchivedJsons(
+ new org.apache.flink.core.fs.Path(
+ directory.toAbsolutePath().toString(),
jobId.toString()),
Collections.singleton(archivedJson));
return jobId.toString();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 05e74a1f20e..e4cee523047 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -235,6 +235,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private final Map<JobID, CompletableFuture<ExecutionGraphInfo>>
partialExecutionGraphInfoStore =
new HashMap<>();
+ private final Map<ApplicationID, CompletableFuture<?>>
applicationArchivingFutures =
+ new HashMap<>();
+
/** Enum to distinguish between initial job submission and re-submission
for recovery. */
protected enum ExecutionType {
SUBMISSION,
@@ -681,7 +684,12 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
applications.containsKey(applicationId),
"Application %s does not exist.",
applicationId);
+ checkState(
+ !applicationArchivingFutures.containsKey(applicationId),
+ "The application (" + applicationId + ") has already been
archived.");
+ log.info(
+ "Archiving application ({}) with terminal state {}.",
applicationId, newStatus);
AbstractApplication application = applications.get(applicationId);
long[] stateTimestamps = new
long[ApplicationState.values().length];
for (ApplicationState applicationState :
ApplicationState.values()) {
@@ -722,6 +730,19 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
applications.remove(applicationId);
writeToArchivedApplicationStore(archivedApplication);
+ CompletableFuture<?>
applicationArchivingFuture =
+ historyServerArchivist
+
.archiveApplication(archivedApplication)
+ .exceptionally(
+ throwable -> {
+ log.info(
+ "Could not
archive completed application ({}) to the history server.",
+
applicationId,
+ throwable);
+ return null;
+ });
+ applicationArchivingFutures.put(
+ applicationId,
applicationArchivingFuture);
},
getMainThreadExecutor());
}
@@ -744,6 +765,11 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
return applications;
}
+ @VisibleForTesting
+ CompletableFuture<?> getApplicationArchivingFuture(ApplicationID
applicationId) {
+ return applicationArchivingFutures.get(applicationId);
+ }
+
/**
* Checks whether the given job has already been executed.
*
@@ -1492,7 +1518,11 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
: CompletableFuture.completedFuture(null);
FutureUtils.runAfterwards(
- allJobsTerminationFuture, () ->
shutDownFuture.complete(applicationStatus));
+ allJobsTerminationFuture,
+ () ->
+ FutureUtils.runAfterwards(
+
FutureUtils.completeAll(applicationArchivingFutures.values()),
+ () ->
shutDownFuture.complete(applicationStatus)));
return CompletableFuture.completedFuture(Acknowledge.get());
}
@@ -1887,7 +1917,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
ExecutionGraphInfo executionGraphInfo) {
return historyServerArchivist
- .archiveExecutionGraph(executionGraphInfo)
+ .archiveExecutionGraph(
+ executionGraphInfo,
executionGraphInfo.getApplicationId().orElse(null))
.handleAsync(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
index 468dac76592..c1ca448d6c2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -18,13 +18,17 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import javax.annotation.Nullable;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -37,16 +41,27 @@ public interface HistoryServerArchivist {
* @param executionGraphInfo to store on the history server
* @return Future which is completed once the archiving has been completed.
*/
- CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo);
+ CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID
applicationId);
+
+ /**
+ * Archives the given application on the history server.
+ *
+ * @param archivedApplication application information used to archive
+ * @return Future which is completed once the archiving has been completed.
+ */
+ CompletableFuture<Acknowledge> archiveApplication(ArchivedApplication
archivedApplication);
static HistoryServerArchivist createHistoryServerArchivist(
- Configuration configuration, JsonArchivist jsonArchivist, Executor
ioExecutor) {
+ Configuration configuration,
+ JsonArchivist jsonArchivist,
+ ApplicationJsonArchivist applicationJsonArchivist,
+ Executor ioExecutor) {
final String configuredArchivePath =
configuration.get(JobManagerOptions.ARCHIVE_DIR);
if (configuredArchivePath != null) {
- final Path archivePath = new Path(configuredArchivePath);
-
- return new JsonResponseHistoryServerArchivist(jsonArchivist,
archivePath, ioExecutor);
+ return new JsonResponseHistoryServerArchivist(
+ configuration, jsonArchivist, applicationJsonArchivist,
ioExecutor);
} else {
return VoidHistoryServerArchivist.INSTANCE;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
index e68f3b5c90c..678442dcfa9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -18,49 +18,84 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.history.ArchivePathUtils;
+import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
+import javax.annotation.Nullable;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Implementation which archives an {@link AccessExecutionGraph} such that it
stores the JSON
* requests for all possible history server requests.
*/
class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
+ private final Configuration configuration;
+
private final JsonArchivist jsonArchivist;
- private final Path archivePath;
+ private final ApplicationJsonArchivist applicationJsonArchivist;
private final Executor ioExecutor;
JsonResponseHistoryServerArchivist(
- JsonArchivist jsonArchivist, Path archivePath, Executor
ioExecutor) {
- this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist);
- this.archivePath = Preconditions.checkNotNull(archivePath);
- this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
+ Configuration configuration,
+ JsonArchivist jsonArchivist,
+ ApplicationJsonArchivist applicationJsonArchivist,
+ Executor ioExecutor) {
+ this.configuration = checkNotNull(configuration);
+ this.jsonArchivist = checkNotNull(jsonArchivist);
+ this.applicationJsonArchivist = checkNotNull(applicationJsonArchivist);
+ this.ioExecutor = checkNotNull(ioExecutor);
}
@Override
public CompletableFuture<Acknowledge> archiveExecutionGraph(
- ExecutionGraphInfo executionGraphInfo) {
+ ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID
applicationId) {
+ Path jobArchivePath =
+ ArchivePathUtils.getJobArchivePath(
+ configuration, executionGraphInfo.getJobId(),
applicationId);
+
return CompletableFuture.runAsync(
ThrowingRunnable.unchecked(
() ->
- FsJobArchivist.archiveJob(
- archivePath,
- executionGraphInfo.getJobId(),
+ FsJsonArchivist.writeArchivedJsons(
+ jobArchivePath,
jsonArchivist.archiveJsonWithPath(
executionGraphInfo))),
ioExecutor)
.thenApply(ignored -> Acknowledge.get());
}
+
+ @Override
+ public CompletableFuture<Acknowledge> archiveApplication(
+ ArchivedApplication archivedApplication) {
+ Path applicationArchivePath =
+ ArchivePathUtils.getApplicationArchivePath(
+ configuration, archivedApplication.getApplicationId());
+
+ return CompletableFuture.runAsync(
+ ThrowingRunnable.unchecked(
+ () ->
+ FsJsonArchivist.writeArchivedJsons(
+ applicationArchivePath,
+
applicationJsonArchivist.archiveApplicationWithPath(
+ archivedApplication))),
+ ioExecutor)
+ .thenApply(ignored -> Acknowledge.get());
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
index 28b42174251..6ab6136e814 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -18,9 +18,13 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import javax.annotation.Nullable;
+
import java.util.concurrent.CompletableFuture;
/** No-op implementation of the {@link HistoryServerArchivist}. */
@@ -28,7 +32,14 @@ public enum VoidHistoryServerArchivist implements
HistoryServerArchivist {
INSTANCE;
@Override
- public CompletableFuture<Acknowledge>
archiveExecutionGraph(ExecutionGraphInfo executionGraph) {
+ public CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraph, @Nullable ApplicationID
applicationId) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> archiveApplication(
+ ArchivedApplication archivedApplication) {
return CompletableFuture.completedFuture(Acknowledge.get());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index cf5b39a0322..0408c4af5c3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -197,7 +197,7 @@ public class
DefaultDispatcherResourceManagerComponentFactory
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
- configuration, webMonitorEndpoint, ioExecutor);
+ configuration, webMonitorEndpoint,
webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 534b3ab5de0..1f63eaf2cc2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
@@ -218,4 +219,11 @@ public interface AccessExecutionGraph extends
JobStatusProvider {
* @return the number of pending operators.
*/
int getPendingOperatorCount();
+
+ /**
+ * Returns the {@link ApplicationID} of the application this job belongs
to.
+ *
+ * @return ID of the application this job belongs to.
+ */
+ Optional<ApplicationID> getApplicationId();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 71935d9eedb..ec50feaaca8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -118,6 +119,8 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
private final int pendingOperatorCount;
+ @Nullable private ApplicationID applicationId;
+
public ArchivedExecutionGraph(
JobID jobID,
String jobName,
@@ -139,7 +142,8 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
@Nullable TernaryBoolean stateChangelogEnabled,
@Nullable String changelogStorageName,
@Nullable String streamGraphJson,
- int pendingOperatorCount) {
+ int pendingOperatorCount,
+ @Nullable ApplicationID applicationId) {
this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
@@ -162,6 +166,7 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
this.changelogStorageName = changelogStorageName;
this.streamGraphJson = streamGraphJson;
this.pendingOperatorCount = pendingOperatorCount;
+ this.applicationId = applicationId;
}
//
--------------------------------------------------------------------------------------------
@@ -317,6 +322,11 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
return pendingOperatorCount;
}
+ @Override
+ public Optional<ApplicationID> getApplicationId() {
+ return Optional.ofNullable(applicationId);
+ }
+
/**
* Create a {@link ArchivedExecutionGraph} from the given {@link
ExecutionGraph}.
*
@@ -387,7 +397,8 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
executionGraph.isChangelogStateBackendEnabled(),
executionGraph.getChangelogStorageName().orElse(null),
executionGraph.getStreamGraphJson(),
- executionGraph.getPendingOperatorCount());
+ executionGraph.getPendingOperatorCount(),
+ executionGraph.getApplicationId().orElse(null));
}
/**
@@ -510,6 +521,7 @@ public class ArchivedExecutionGraph implements
AccessExecutionGraph, Serializabl
:
checkpointingSettings.isChangelogStateBackendEnabled(),
checkpointingSettings == null ? null : "Unknown",
null,
- 0);
+ 0,
+ null);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 345b8977d19..1f6fa2420f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -844,6 +845,11 @@ public class DefaultExecutionGraph implements
ExecutionGraph, InternalExecutionG
this.internalTaskFailuresListener = internalTaskFailuresListener;
}
+ @Override
+ public Optional<ApplicationID> getApplicationId() {
+ return jobInformation.getApplicationId();
+ }
+
//
--------------------------------------------------------------------------------------------
// Actions
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 5f3ede18ac3..5c9085e64ec 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -116,7 +116,8 @@ public class DefaultExecutionGraphBuilder {
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
- jobGraph.getClasspaths());
+ jobGraph.getClasspaths(),
+ jobGraph.getApplicationId().orElse(null));
final int executionHistorySizeLimit =
jobManagerConfig.get(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index 48883e92bba..018016e43eb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
@@ -30,10 +32,13 @@ import org.apache.flink.util.SerializedValue;
import
org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableCollection;
import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.net.URL;
import java.util.Collection;
import java.util.Objects;
+import java.util.Optional;
/** Container class for job information which is stored in the {@link
ExecutionGraph}. */
public class JobInformation implements Serializable {
@@ -61,6 +66,9 @@ public class JobInformation implements Serializable {
/** URLs specifying the classpath to add to the class loader. */
private final ImmutableCollection<URL> requiredClasspathURLs;
+ @Nullable private final ApplicationID applicationId;
+
+ @VisibleForTesting
public JobInformation(
JobID jobId,
JobType jobType,
@@ -69,6 +77,26 @@ public class JobInformation implements Serializable {
Configuration jobConfiguration,
Collection<PermanentBlobKey> requiredJarFileBlobKeys,
Collection<URL> requiredClasspathURLs) {
+ this(
+ jobId,
+ jobType,
+ jobName,
+ serializedExecutionConfig,
+ jobConfiguration,
+ requiredJarFileBlobKeys,
+ requiredClasspathURLs,
+ null);
+ }
+
+ public JobInformation(
+ JobID jobId,
+ JobType jobType,
+ String jobName,
+ SerializedValue<ExecutionConfig> serializedExecutionConfig,
+ Configuration jobConfiguration,
+ Collection<PermanentBlobKey> requiredJarFileBlobKeys,
+ Collection<URL> requiredClasspathURLs,
+ @Nullable ApplicationID applicationId) {
this.jobId = Preconditions.checkNotNull(jobId);
this.jobType = Preconditions.checkNotNull(jobType);
this.jobName = Preconditions.checkNotNull(jobName);
@@ -79,6 +107,7 @@ public class JobInformation implements Serializable {
ImmutableList.copyOf(Preconditions.checkNotNull(requiredJarFileBlobKeys));
this.requiredClasspathURLs =
ImmutableList.copyOf(Preconditions.checkNotNull(requiredClasspathURLs));
+ this.applicationId = applicationId;
}
public JobID getJobId() {
@@ -109,6 +138,10 @@ public class JobInformation implements Serializable {
return requiredClasspathURLs;
}
+ public Optional<ApplicationID> getApplicationId() {
+ return Optional.ofNullable(applicationId);
+ }
+
// All fields are immutable, so return this directly.
public JobInformation deepCopy() {
return this;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/ArchivePathUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/ArchivePathUtils.java
new file mode 100644
index 00000000000..ebb26f9fb97
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/ArchivePathUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+/** Utils for archive path. */
+public class ArchivePathUtils {
+
+ public static final String JOBS_DIR = "jobs";
+
+ public static final String APPLICATIONS_DIR = "applications";
+
+ public static final String APPLICATION_ARCHIVE_NAME =
"application-summary";
+
+ public static Path getJobArchivePath(
+ Configuration configuration, JobID jobId, @Nullable ApplicationID
applicationId) {
+ String archiveDir = configuration.get(JobManagerOptions.ARCHIVE_DIR);
+ String clusterId = configuration.get(ClusterOptions.CLUSTER_ID);
+ if (applicationId == null) {
+ return new Path(archiveDir, jobId.toHexString());
+ } else {
+ Path applicationDir = getApplicationDir(archiveDir, clusterId,
applicationId);
+ return new Path(new Path(applicationDir, JOBS_DIR),
jobId.toHexString());
+ }
+ }
+
+ public static Path getApplicationArchivePath(
+ Configuration configuration, ApplicationID applicationId) {
+ String archiveDir = configuration.get(JobManagerOptions.ARCHIVE_DIR);
+ String clusterId = configuration.get(ClusterOptions.CLUSTER_ID);
+ Path applicationDir = getApplicationDir(archiveDir, clusterId,
applicationId);
+ return new Path(applicationDir, APPLICATION_ARCHIVE_NAME);
+ }
+
+ private static Path getApplicationDir(
+ String archiveDir, String clusterId, ApplicationID applicationId) {
+ Path clusterDir = new Path(archiveDir, clusterId);
+ return new Path(new Path(clusterDir, APPLICATIONS_DIR),
applicationId.toHexString());
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJsonArchivist.java
similarity index 66%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJsonArchivist.java
index 65c76fdefaa..ec1f5011e00 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJsonArchivist.java
@@ -18,12 +18,9 @@
package org.apache.flink.runtime.history;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -44,9 +41,9 @@ import java.util.ArrayList;
import java.util.Collection;
/** Utility class for writing an archive file to a {@link FileSystem} and
reading it back. */
-public class FsJobArchivist {
+public class FsJsonArchivist {
- private static final Logger LOG =
LoggerFactory.getLogger(FsJobArchivist.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(FsJsonArchivist.class);
private static final JsonFactory jacksonFactory = new JsonFactory();
private static final ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper();
@@ -54,46 +51,35 @@ public class FsJobArchivist {
private static final String PATH = "path";
private static final String JSON = "json";
- private FsJobArchivist() {}
+ private FsJsonArchivist() {}
/**
- * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem}
pointed to by {@link
- * JobManagerOptions#ARCHIVE_DIR}.
+ * Writes the given {@link ArchivedJson} to the {@link FileSystem}.
*
- * @param rootPath directory to which the archive should be written to
- * @param jobId job id
+ * @param filePath file path to which the archive should be written to
* @param jsonToArchive collection of json-path pairs to that should be
archived
- * @return path to where the archive was written, or null if no archive
was created
- * @throws IOException
*/
- public static Path archiveJob(
- Path rootPath, JobID jobId, Collection<ArchivedJson>
jsonToArchive) throws IOException {
- try {
- FileSystem fs = rootPath.getFileSystem();
- Path path = new Path(rootPath, jobId.toString());
- OutputStream out = fs.create(path,
FileSystem.WriteMode.NO_OVERWRITE);
-
- try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
+ public static void writeArchivedJsons(Path filePath,
Collection<ArchivedJson> jsonToArchive)
+ throws IOException {
+ FileSystem fs = filePath.getFileSystem();
+ OutputStream out = fs.create(filePath,
FileSystem.WriteMode.NO_OVERWRITE);
+ try (JsonGenerator gen = jacksonFactory.createGenerator(out,
JsonEncoding.UTF8)) {
+ gen.writeStartObject();
+ gen.writeArrayFieldStart(ARCHIVE);
+ for (ArchivedJson archive : jsonToArchive) {
gen.writeStartObject();
- gen.writeArrayFieldStart(ARCHIVE);
- for (ArchivedJson archive : jsonToArchive) {
- gen.writeStartObject();
- gen.writeStringField(PATH, archive.getPath());
- gen.writeStringField(JSON, archive.getJson());
- gen.writeEndObject();
- }
- gen.writeEndArray();
+ gen.writeStringField(PATH, archive.getPath());
+ gen.writeStringField(JSON, archive.getJson());
gen.writeEndObject();
- } catch (Exception e) {
- fs.delete(path, false);
- throw e;
}
- LOG.info("Job {} has been archived at {}.", jobId, path);
- return path;
- } catch (IOException e) {
- LOG.error("Failed to archive job.", e);
+ gen.writeEndArray();
+ gen.writeEndObject();
+ } catch (Exception e) {
+ fs.delete(filePath, false);
+ LOG.error("Failed to archive {}", filePath, e);
throw e;
}
+ LOG.info("Successfully archived {}.", filePath);
}
/**
@@ -104,7 +90,7 @@ public class FsJobArchivist {
* @return collection of archived jsons
* @throws IOException if the file can't be opened, read or doesn't
contain valid json
*/
- public static Collection<ArchivedJson> getArchivedJsons(Path file) throws
IOException {
+ public static Collection<ArchivedJson> readArchivedJsons(Path file) throws
IOException {
try (FSDataInputStream input = file.getFileSystem().open(file);
ByteArrayOutputStream output = new ByteArrayOutputStream()) {
IOUtils.copyBytes(input, output);
@@ -122,7 +108,9 @@ public class FsJobArchivist {
} catch (NullPointerException npe) {
// occurs if the archive is empty or any of the expected
fields are not present
throw new IOException(
- "Job archive (" + file.getPath() + ") did not conform
to expected format.");
+ "Json archive ("
+ + file.getPath()
+ + ") did not conform to expected format.");
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandler.java
index a195efb97d6..99e7dc836a3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler.application;
import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -27,11 +28,16 @@ import
org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
+import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -41,7 +47,8 @@ public class ApplicationDetailsHandler
RestfulGateway,
EmptyRequestBody,
ApplicationDetailsInfo,
- ApplicationMessageParameters> {
+ ApplicationMessageParameters>
+ implements ApplicationJsonArchivist {
public ApplicationDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -59,4 +66,18 @@ public class ApplicationDetailsHandler
return gateway.requestApplication(applicationId, timeout)
.thenApply(ApplicationDetailsInfo::fromArchivedApplication);
}
+
+ @Override
+ public Collection<ArchivedJson> archiveApplicationWithPath(
+ ArchivedApplication archivedApplication) throws IOException {
+ String path =
+ getMessageHeaders()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + ApplicationIDPathParameter.KEY,
+
archivedApplication.getApplicationId().toHexString());
+ return Collections.singleton(
+ new ArchivedJson(
+ path,
ApplicationDetailsInfo.fromArchivedApplication(archivedApplication)));
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandler.java
index 47f8ea10ccb..c43dd1ebadd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandler.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.rest.handler.application;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
import
org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -25,12 +27,18 @@ import
org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
+import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -40,7 +48,8 @@ public class ApplicationsOverviewHandler
RestfulGateway,
EmptyRequestBody,
MultipleApplicationsDetails,
- EmptyMessageParameters> {
+ EmptyMessageParameters>
+ implements ApplicationJsonArchivist {
public ApplicationsOverviewHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -57,4 +66,15 @@ public class ApplicationsOverviewHandler
throws RestHandlerException {
return gateway.requestMultipleApplicationDetails(timeout);
}
+
+ @Override
+ public Collection<ArchivedJson> archiveApplicationWithPath(
+ ArchivedApplication archivedApplication) throws IOException {
+ ResponseBody json =
+ new MultipleApplicationsDetails(
+ Collections.singleton(
+
ApplicationDetails.fromArchivedApplication(archivedApplication)));
+ String path = getMessageHeaders().getTargetRestEndpointURL();
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index 99517322305..b25996ddc54 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -25,6 +26,7 @@ import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryE
import java.io.Serializable;
import java.util.Collections;
+import java.util.Optional;
/**
* {@code ExecutionGraphInfo} serves as a composite class that provides
different {@link
@@ -65,4 +67,8 @@ public class ExecutionGraphInfo implements Serializable {
public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
return exceptionHistory;
}
+
+ public Optional<ApplicationID> getApplicationId() {
+ return executionGraph.getApplicationId();
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index d9f09f131d1..6e3926d6a2c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RpcOptions;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -169,6 +170,7 @@ import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpH
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -210,7 +212,7 @@ import java.util.concurrent.TimeUnit;
* @param <T> type of the leader gateway
*/
public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndpoint
- implements LeaderContender, JsonArchivist {
+ implements LeaderContender, JsonArchivist, ApplicationJsonArchivist {
protected final GatewayRetriever<? extends T> leaderRetriever;
protected final Configuration clusterConfiguration;
@@ -232,7 +234,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
private boolean hasWebUI = false;
- private final Collection<JsonArchivist> archivingHandlers = new
ArrayList<>(16);
+ private final Collection<JsonArchivist> jobArchivingHandlers = new
ArrayList<>(16);
+
+ private final Collection<ApplicationJsonArchivist>
applicationArchivingHandlers =
+ new ArrayList<>(16);
@Nullable private ScheduledFuture<?> executionGraphCleanupTask;
@@ -1170,7 +1175,15 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
handlers.stream()
.map(tuple -> tuple.f1)
.filter(handler -> handler instanceof JsonArchivist)
- .forEachOrdered(handler ->
archivingHandlers.add((JsonArchivist) handler));
+ .forEachOrdered(handler ->
jobArchivingHandlers.add((JsonArchivist) handler));
+
+ handlers.stream()
+ .map(tuple -> tuple.f1)
+ .filter(handler -> handler instanceof ApplicationJsonArchivist)
+ .forEachOrdered(
+ handler ->
+ applicationArchivingHandlers.add(
+ (ApplicationJsonArchivist) handler));
return handlers;
}
@@ -1267,14 +1280,27 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
@Override
public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
throws IOException {
- Collection<ArchivedJson> archivedJson = new
ArrayList<>(archivingHandlers.size());
- for (JsonArchivist archivist : archivingHandlers) {
+ Collection<ArchivedJson> archivedJson = new
ArrayList<>(jobArchivingHandlers.size());
+ for (JsonArchivist archivist : jobArchivingHandlers) {
Collection<ArchivedJson> subArchive =
archivist.archiveJsonWithPath(executionGraphInfo);
archivedJson.addAll(subArchive);
}
return archivedJson;
}
+ @Override
+ public Collection<ArchivedJson> archiveApplicationWithPath(
+ ArchivedApplication archivedApplication) throws IOException {
+ Collection<ArchivedJson> archivedJson =
+ new ArrayList<>(applicationArchivingHandlers.size());
+ for (ApplicationJsonArchivist archivist :
applicationArchivingHandlers) {
+ Collection<ArchivedJson> subArchive =
+ archivist.archiveApplicationWithPath(archivedApplication);
+ archivedJson.addAll(subArchive);
+ }
+ return archivedJson;
+ }
+
public static ScheduledExecutorService createExecutorService(
int numThreads, int threadPriority, String componentName) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority >
Thread.MAX_PRIORITY) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ApplicationJsonArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ApplicationJsonArchivist.java
new file mode 100644
index 00000000000..fcc11fd0252
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ApplicationJsonArchivist.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.application.ArchivedApplication;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of
application-related json
+ * responses.
+ */
+public interface ApplicationJsonArchivist {
+
+ /**
+ * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON
responses and their
+ * respective REST URL for a given application.
+ *
+ * <p>The collection should contain one entry for every response that
could be generated for the
+ * given application. The REST URLs should be unique and must not contain
placeholders.
+ *
+ * @param archivedApplication application-related information
+ * @return Collection containing an ArchivedJson for every response that
could be generated for
+ * the given application
+ * @throws IOException thrown if the JSON generation fails
+ */
+ Collection<ArchivedJson> archiveApplicationWithPath(ArchivedApplication
archivedApplication)
+ throws IOException;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index cc31f2cdf3d..4bc3535c0fd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.history;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.util.Preconditions;
@@ -33,7 +33,7 @@ import java.util.Objects;
* A simple container for a handler's JSON response and the REST URLs for
which the response
* would've been returned.
*
- * <p>These are created by {@link JsonArchivist}s, and used by the {@link
FsJobArchivist} to create
+ * <p>These are created by {@link JsonArchivist}s, and used by the {@link
FsJsonArchivist} to create
* a directory structure resembling the REST API.
*/
public class ArchivedJson {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index a8b2805c73f..682c9f84ea5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -666,7 +666,12 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
final TestingDispatcher.Builder testingDispatcherBuilder =
createTestingDispatcherBuilder()
- .setHistoryServerArchivist(executionGraphInfo ->
archiveFuture);
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveExecutionGraphFunction(
+ (executionGraphInfo,
applicationId) ->
+ archiveFuture)
+ .build());
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
@@ -695,10 +700,14 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
final TestingDispatcher.Builder testingDispatcherBuilder =
createTestingDispatcherBuilder()
.setHistoryServerArchivist(
- executionGraphInfo -> {
- isArchived.set(true);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
+ TestingHistoryServerArchivist.builder()
+ .setArchiveExecutionGraphFunction(
+ (executionGraphInfo,
applicationId) -> {
+ isArchived.set(true);
+ return
CompletableFuture.completedFuture(
+ Acknowledge.get());
+ })
+ .build());
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ce84f897f08..d6a16166623 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -157,6 +157,10 @@ import java.util.stream.Stream;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
/** Test for the {@link Dispatcher} component. */
public class DispatcherTest extends AbstractDispatcherTest {
@@ -551,10 +555,13 @@ public class DispatcherTest extends
AbstractDispatcherTest {
new FinishingJobManagerRunnerFactory(
jobTerminationFuture, () -> {}))
.setHistoryServerArchivist(
- executionGraphInfo -> {
- archiveAttemptFuture.complete(null);
- return
CompletableFuture.completedFuture(null);
- })
+ TestingHistoryServerArchivist.builder()
+ .setArchiveExecutionGraphFunction(
+ (executionGraphInfo,
applicationId) -> {
+
archiveAttemptFuture.complete(null);
+ return
CompletableFuture.completedFuture(null);
+ })
+ .build())
.build(rpcService);
dispatcher.start();
jobMasterLeaderElection.isLeader(UUID.randomUUID());
@@ -580,6 +587,75 @@ public class DispatcherTest extends AbstractDispatcherTest
{
assertThat(archiveAttemptFuture).isNotDone();
}
+ @Test
+ public void
testApplicationStatusChange_ArchiveNotCalledForNonTerminalStatus()
+ throws Exception {
+ final CompletableFuture<Void> archiveApplicationFuture = new
CompletableFuture<>();
+ dispatcher =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveApplicationFunction(
+ archivedApplication -> {
+
archiveApplicationFuture.complete(null);
+ return
CompletableFuture.completedFuture(null);
+ })
+ .build())
+ .build(rpcService);
+ dispatcher.start();
+ ApplicationID applicationId =
mockApplicationStatusChange(ApplicationState.RUNNING);
+ // verify that archive application is not called
+ assertFalse(archiveApplicationFuture.isDone());
+ assertNull(dispatcher.getApplicationArchivingFuture(applicationId));
+ }
+
+ @Test
+ public void testApplicationStatusChange_ArchiveCalledForTerminalStatus()
throws Exception {
+ final CompletableFuture<ApplicationID> archiveApplicationFuture = new
CompletableFuture<>();
+ dispatcher =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveApplicationFunction(
+ archivedApplication -> {
+
archiveApplicationFuture.complete(
+
archivedApplication.getApplicationId());
+ return
CompletableFuture.completedFuture(null);
+ })
+ .build())
+ .build(rpcService);
+ dispatcher.start();
+ final ApplicationID applicationId =
mockApplicationStatusChange(ApplicationState.FINISHED);
+ // verify that archive application is called with the application id
+ assertEquals(applicationId, archiveApplicationFuture.get());
+ dispatcher
+ .getApplicationArchivingFuture(applicationId)
+ .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testApplicationStatusChange_ThrowsIfDuplicateTerminalStatus()
throws Exception {
+ dispatcher = createTestingDispatcherBuilder().build(rpcService);
+ dispatcher.start();
+ final ApplicationID applicationId =
mockApplicationStatusChange(ApplicationState.FINISHED);
+ assertThrows(
+ IllegalStateException.class,
+ () ->
+ dispatcher.notifyApplicationStatusChange(
+ applicationId, ApplicationState.FAILED));
+ }
+
+ private ApplicationID mockApplicationStatusChange(ApplicationState
targetState) {
+ final ApplicationID applicationId = new ApplicationID();
+ dispatcher
+ .getApplications()
+ .put(
+ applicationId,
+
TestingApplication.builder().setApplicationId(applicationId).build());
+ dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+ return applicationId;
+ }
+
@Test
public void testJobManagerRunnerInitializationFailureFailsJob() throws
Exception {
final TestingJobMasterServiceLeadershipRunnerFactory
testingJobManagerRunnerFactory =
@@ -1315,6 +1391,33 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.getShutDownFuture().get();
}
+ @Test
+ public void testShutDownFutureCompletesAfterApplicationArchivingFutures()
throws Exception {
+ final CompletableFuture<Acknowledge> archiveApplicationFuture = new
CompletableFuture<>();
+ dispatcher =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveApplicationFunction(
+ archivedApplication ->
archiveApplicationFuture)
+ .build())
+ .build(rpcService);
+ dispatcher.start();
+
+ mockApplicationStatusChange(ApplicationState.FINISHED);
+
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED).get();
+ assertThatThrownBy(() -> dispatcher.getShutDownFuture().get(100L,
TimeUnit.MILLISECONDS))
+ .isInstanceOf(TimeoutException.class);
+
+ archiveApplicationFuture.complete(null);
+
+ dispatcher.getShutDownFuture().get();
+ }
+
@Test
public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
final CompletableFuture<JobID> removeJobGraphFuture = new
CompletableFuture<>();
@@ -1533,15 +1636,15 @@ public class DispatcherTest extends
AbstractDispatcherTest {
new ExpectedJobIdJobManagerRunnerFactory(jobId));
DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
-
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1);
-
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
-
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3);
+ assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1);
+ assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
+ assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10);
-
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
-
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
+ assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10);
+ assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
+ assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
}
private JobManagerRunner runningJobManagerRunnerWithJobStatus(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingHistoryServerArchivist.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingHistoryServerArchivist.java
new file mode 100644
index 00000000000..8ed6d818e85
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingHistoryServerArchivist.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * {@code TestingHistoryServerArchivist} implements {@link
HistoryServerArchivist} to be used in
+ * test contexts.
+ */
+public class TestingHistoryServerArchivist implements HistoryServerArchivist {
+
+ private final BiFunction<ExecutionGraphInfo, ApplicationID,
CompletableFuture<Acknowledge>>
+ archiveExecutionGraphFunction;
+ private final Function<ArchivedApplication, CompletableFuture<Acknowledge>>
+ archiveApplicationFunction;
+
+ public TestingHistoryServerArchivist(
+ BiFunction<ExecutionGraphInfo, ApplicationID,
CompletableFuture<Acknowledge>>
+ archiveExecutionGraphFunction,
+ Function<ArchivedApplication, CompletableFuture<Acknowledge>>
+ archiveApplicationFunction) {
+ this.archiveExecutionGraphFunction = archiveExecutionGraphFunction;
+ this.archiveApplicationFunction = archiveApplicationFunction;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID
applicationId) {
+ return archiveExecutionGraphFunction.apply(executionGraphInfo,
applicationId);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> archiveApplication(
+ ArchivedApplication archivedApplication) {
+ return archiveApplicationFunction.apply(archivedApplication);
+ }
+
+ public static TestingHistoryServerArchivist.Builder builder() {
+ return new TestingHistoryServerArchivist.Builder();
+ }
+
+ public static class Builder {
+ private BiFunction<ExecutionGraphInfo, ApplicationID,
CompletableFuture<Acknowledge>>
+ archiveExecutionGraphFunction =
+ (executionGraphInfo, applicationId) ->
+
CompletableFuture.completedFuture(Acknowledge.get());
+ private Function<ArchivedApplication, CompletableFuture<Acknowledge>>
+ archiveApplicationFunction =
+ ignored ->
CompletableFuture.completedFuture(Acknowledge.get());
+
+ public Builder setArchiveExecutionGraphFunction(
+ BiFunction<ExecutionGraphInfo, ApplicationID,
CompletableFuture<Acknowledge>>
+ function) {
+ this.archiveExecutionGraphFunction = function;
+ return this;
+ }
+
+ public Builder setArchiveApplicationFunction(
+ Function<ArchivedApplication, CompletableFuture<Acknowledge>>
function) {
+ this.archiveApplicationFunction = function;
+ return this;
+ }
+
+ public TestingHistoryServerArchivist build() {
+ return new TestingHistoryServerArchivist(
+ archiveExecutionGraphFunction, archiveApplicationFunction);
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/history/ArchivePathUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/history/ArchivePathUtilsTest.java
new file mode 100644
index 00000000000..2784a86b7dc
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/history/ArchivePathUtilsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the {@link ArchivePathUtils}. */
+public class ArchivePathUtilsTest {
+
+ private Configuration configuration;
+
+ @BeforeEach
+ void setUp() {
+ configuration = new Configuration();
+ configuration.set(JobManagerOptions.ARCHIVE_DIR, "/archive");
+ configuration.set(ClusterOptions.CLUSTER_ID, "cluster123");
+ }
+
+ @Test
+ void testGetJobArchivePath_withoutApplicationId() {
+ JobID jobId = new JobID();
+
+ Path result = ArchivePathUtils.getJobArchivePath(configuration, jobId,
null);
+
+ Path expected = new Path("/archive", jobId.toHexString());
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testGetJobArchivePath_withApplicationId() {
+ JobID jobId = new JobID();
+ ApplicationID applicationId = new ApplicationID();
+
+ Path result = ArchivePathUtils.getJobArchivePath(configuration, jobId,
applicationId);
+
+ Path expected =
+ new Path(
+ "/archive/cluster123/applications/"
+ + applicationId.toHexString()
+ + "/jobs/"
+ + jobId.toHexString());
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testGetApplicationArchivePath() {
+ ApplicationID applicationId = new ApplicationID();
+
+ Path result =
ArchivePathUtils.getApplicationArchivePath(configuration, applicationId);
+
+ Path expected =
+ new Path(
+ "/archive/cluster123/applications/"
+ + applicationId.toHexString()
+ + "/application-summary");
+
+ assertEquals(expected, result);
+ }
+}
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/history/FsJsonArchivistTest.java
similarity index 73%
rename from
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/history/FsJsonArchivistTest.java
index eea55489566..6a4081d32fc 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/history/FsJsonArchivistTest.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.webmonitor.history;
+package org.apache.flink.runtime.history;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -31,20 +30,19 @@ import java.util.Collection;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for the {@link FsJobArchivist}. */
-class FsJobArchivistTest {
+/** Tests for the {@link FsJsonArchivist}. */
+class FsJsonArchivistTest {
@Test
- void testArchiveJob(@TempDir File tmpFolder) throws Exception {
- final Path tmpPath = new Path(tmpFolder.getAbsolutePath());
- final JobID jobId = new JobID();
+ void testArchiveJsons(@TempDir File tmpFolder) throws Exception {
+ final Path tmpPath = new Path(tmpFolder.getAbsolutePath(),
"test-file");
final Collection<ArchivedJson> toArchive = new ArrayList<>(2);
toArchive.add(new ArchivedJson("dir1", "hello"));
toArchive.add(new ArchivedJson("dir1/dir11", "world"));
- final Path archive = FsJobArchivist.archiveJob(tmpPath, jobId,
toArchive);
- final Collection<ArchivedJson> restored =
FsJobArchivist.getArchivedJsons(archive);
+ FsJsonArchivist.writeArchivedJsons(tmpPath, toArchive);
+ final Collection<ArchivedJson> restored =
FsJsonArchivist.readArchivedJsons(tmpPath);
assertThat(restored).containsExactlyElementsOf(toArchive);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index 6c7a0732b4e..c3e6e36859e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.handler.legacy.utils;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
@@ -60,6 +61,7 @@ public class ArchivedExecutionGraphBuilder {
private CheckpointStatsSnapshot checkpointStatsSnapshot;
private String streamGraphJson;
private int pendingOperatorCounts = 0;
+ private ApplicationID applicationId;
public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
this.jobID = jobID;
@@ -143,6 +145,11 @@ public class ArchivedExecutionGraphBuilder {
return this;
}
+ public ArchivedExecutionGraphBuilder setApplicationId(ApplicationID
applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
public ArchivedExecutionGraph build() {
JobID jobID = this.jobID != null ? this.jobID : new JobID();
String jobName = this.jobName != null ? this.jobName : "job_" +
RANDOM.nextInt();
@@ -182,6 +189,7 @@ public class ArchivedExecutionGraphBuilder {
TernaryBoolean.UNDEFINED,
"changelogStorageName",
streamGraphJson,
- pendingOperatorCounts);
+ pendingOperatorCounts,
+ applicationId);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index 1c70ad7554f..87cabd8d271 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -439,4 +440,9 @@ class StateTrackingMockExecutionGraph implements
ExecutionGraph {
public Optional<AccessExecution> findExecution(ExecutionAttemptID
attemptId) {
return Optional.ofNullable(executions.get(attemptId));
}
+
+ @Override
+ public Optional<ApplicationID> getApplicationId() {
+ return Optional.empty();
+ }
}