[FLINK-9194][history] Add archiving routine to Dispatcher

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd374b83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd374b83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd374b83

Branch: refs/heads/master
Commit: fd374b832f830adfa59b7f834b11c38080486f1c
Parents: 6b6603f
Author: zentol <[email protected]>
Authored: Wed Apr 18 14:33:04 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 43 +++++++++++++++++++-
 .../runtime/dispatcher/MiniDispatcher.java      |  5 ++-
 .../dispatcher/StandaloneDispatcher.java        |  7 +++-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  7 +++-
 .../entrypoint/JobClusterEntrypoint.java        |  5 ++-
 .../entrypoint/SessionClusterEntrypoint.java    |  7 +++-
 .../flink/runtime/history/FsJobArchivist.java   | 42 +++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 22 +++++++++-
 .../runtime/dispatcher/DispatcherTest.java      |  1 +
 .../runtime/dispatcher/MiniDispatcherTest.java  |  1 +
 11 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 58ffda3..82b9291 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -124,6 +129,12 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        @Nullable
        protected final String restAddress;
 
+       @Nullable
+       private final JsonArchivist jsonArchivist;
+
+       @Nullable
+       private final Path archivePath;
+
        private CompletableFuture<Void> 
orphanedJobManagerRunnersTerminationFuture = 
CompletableFuture.completedFuture(null);
 
        public Dispatcher(
@@ -140,7 +151,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress) throws Exception {
+                       @Nullable String restAddress,
+                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
                super(rpcService, endpointId);
 
                this.configuration = Preconditions.checkNotNull(configuration);
@@ -165,6 +177,22 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                this.restAddress = restAddress;
 
+               this.jsonArchivist = jsonArchivist;
+
+               String configuredArchivePath = 
configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+               if (configuredArchivePath != null) {
+                       Path tmpArchivePath = null;
+                       try {
+                               tmpArchivePath = 
WebMonitorUtils.validateAndNormalizeUri(new 
Path(configuredArchivePath).toUri());
+                       } catch (Exception e) {
+                               log.warn("Failed to validate specified archive 
directory in '{}'. " +
+                                       "Jobs will not be archived for the 
HistoryServer.", configuredArchivePath, e);
+                       }
+                       archivePath = tmpArchivePath;
+               } else {
+                       archivePath = null;
+               }
+
                this.archivedExecutionGraphStore = 
Preconditions.checkNotNull(archivedExecutionGraphStore);
 
                this.jobManagerRunnerFactory = 
Preconditions.checkNotNull(jobManagerRunnerFactory);
@@ -621,6 +649,19 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                e);
                }
 
+               try {
+                       if (jsonArchivist != null && archivePath != null) {
+                               FsJobArchivist.archiveJob(archivePath, 
archivedExecutionGraph.getJobID(), 
jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
+                               log.info("Archived job {} to {}", 
archivedExecutionGraph.getJobID(), archivePath);
+                       }
+               } catch (IOException e) {
+                       log.info(
+                               "Could not archive completed job {}({}).",
+                               archivedExecutionGraph.getJobName(),
+                               archivedExecutionGraph.getJobID(),
+                               e);
+               }
+
                final JobID jobId = archivedExecutionGraph.getJobID();
 
                removeJob(jobId, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 4361e08..38e74fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher {
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
+                       @Nullable JsonArchivist jsonArchivist,
                        JobGraph jobGraph,
                        JobClusterEntrypoint.ExecutionMode executionMode) 
throws Exception {
                super(
@@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher {
                        archivedExecutionGraphStore,
                        jobManagerRunnerFactory,
                        fatalErrorHandler,
-                       restAddress);
+                       restAddress,
+                       jsonArchivist);
 
                this.executionMode = checkNotNull(executionMode);
                this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 52ac7a0..5c6a7ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher {
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress) throws Exception {
+                       @Nullable String restAddress,
+                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
                super(
                        rpcService,
                        endpointId,
@@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        archivedExecutionGraphStore,
                        jobManagerRunnerFactory,
                        fatalErrorHandler,
-                       restAddress);
+                       restAddress,
+                       jsonArchivist);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f823ea7..933add8 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                metricRegistry.getMetricQueryServicePath(),
                                archivedExecutionGraphStore,
                                this,
-                               webMonitorEndpoint.getRestBaseUrl());
+                               webMonitorEndpoint.getRestBaseUrl(),
+                               webMonitorEndpoint);
 
                        LOG.debug("Starting ResourceManager.");
                        resourceManager.start();
@@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                @Nullable String metricQueryServicePath,
                ArchivedExecutionGraphStore archivedExecutionGraphStore,
                FatalErrorHandler fatalErrorHandler,
-               @Nullable String restAddress) throws Exception;
+               @Nullable String restAddress,
+               @Nullable JsonArchivist jsonArchivist) throws Exception;
 
        protected abstract ResourceManager<?> createResourceManager(
                Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ea7cbe2..2a5b8ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        @Nullable String metricQueryServicePath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress) throws Exception {
+                       @Nullable String restAddress,
+                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
 
                final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,
                        restAddress,
+                       jsonArchivist,
                        jobGraph,
                        executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fcab796..85446eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        @Nullable String metricQueryServicePath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress) throws Exception {
+                       @Nullable String restAddress,
+                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
 
                // create the default dispatcher
                return new StandaloneDispatcher(
@@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        archivedExecutionGraphStore,
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,
-                       restAddress);
+                       restAddress,
+                       jsonArchivist);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 3a6ea4f..1cfbf96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +68,9 @@ public class FsJobArchivist {
         * @param graph  graph to archive
         * @return path to where the archive was written, or null if no archive 
was created
         * @throws IOException
+        * @deprecated only kept for legacy reasons
         */
+       @Deprecated
        public static Path archiveJob(Path rootPath, AccessExecutionGraph 
graph) throws IOException {
                try {
                        FileSystem fs = rootPath.getFileSystem();
@@ -100,6 +103,45 @@ public class FsJobArchivist {
        }
 
        /**
+        * Writes the given {@link AccessExecutionGraph} to the {@link 
FileSystem} pointed to by
+        * {@link JobManagerOptions#ARCHIVE_DIR}.
+        *
+        * @param rootPath directory to which the archive should be written to
+        * @param jobId  job id
+        * @param jsonToArchive collection of json-path pairs to that should be 
archived
+        * @return path to where the archive was written, or null if no archive 
was created
+        * @throws IOException
+        */
+       public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) throws IOException {
+               try {
+                       FileSystem fs = rootPath.getFileSystem();
+                       Path path = new Path(rootPath, jobId.toString());
+                       OutputStream out = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+
+                       try (JsonGenerator gen = 
jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+                               gen.writeStartObject();
+                               gen.writeArrayFieldStart(ARCHIVE);
+                               for (ArchivedJson archive : jsonToArchive) {
+                                       gen.writeStartObject();
+                                       gen.writeStringField(PATH, 
archive.getPath());
+                                       gen.writeStringField(JSON, 
archive.getJson());
+                                       gen.writeEndObject();
+                               }
+                               gen.writeEndArray();
+                               gen.writeEndObject();
+                       } catch (Exception e) {
+                               fs.delete(path, false);
+                               throw e;
+                       }
+                       LOG.info("Job {} has been archived at {}.", jobId, 
path);
+                       return path;
+               } catch (IOException e) {
+                       LOG.error("Failed to archive job.", e);
+                       throw e;
+               }
+       }
+
+       /**
         * Reads the given archive file and returns a {@link Collection} of 
contained {@link ArchivedJson}.
         *
         * @param file archive to extract

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a86aa2e..09f8bf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        new MemoryArchivedExecutionGraphStore(),
                                        
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                                        new ShutDownFatalErrorHandler(),
-                                       
dispatcherRestEndpoint.getRestBaseUrl());
+                                       dispatcherRestEndpoint.getRestBaseUrl(),
+                                       dispatcherRestEndpoint);
 
                                dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5cb57d3..f9b2fa8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,8 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -126,6 +129,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -137,7 +141,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> type of the leader gateway
  */
-public class WebMonitorEndpoint<T extends RestfulGateway> extends 
RestServerEndpoint implements LeaderContender {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends 
RestServerEndpoint implements LeaderContender, JsonArchivist {
 
        protected final GatewayRetriever<? extends T> leaderRetriever;
        protected final Configuration clusterConfiguration;
@@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
        private boolean hasWebUI = false;
 
+       private final Collection<JsonArchivist> archivingHandlers = new 
ArrayList<>(16);
+
        public WebMonitorEndpoint(
                        RestServerEndpointConfiguration endpointConfiguration,
                        GatewayRetriever<? extends T> leaderRetriever,
@@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), 
taskManagerLogFileHandler));
                
handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), 
taskManagerStdoutFileHandler));
 
+               handlers.stream()
+                       .map(tuple -> tuple.f1)
+                       .filter(handler -> handler instanceof JsonArchivist)
+                       .forEachOrdered(handler -> 
archivingHandlers.add((JsonArchivist) handler));
+
                return handlers;
        }
 
@@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                fatalErrorHandler.onFatalError(exception);
        }
 
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               Collection<ArchivedJson> archivedJson = new 
ArrayList<>(archivingHandlers.size());
+               for (JsonArchivist archivist : archivingHandlers) {
+                       Collection<ArchivedJson> subArchive = 
archivist.archiveJsonWithPath(graph);
+                       archivedJson.addAll(subArchive);
+               }
+               return archivedJson;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 18a8ec1..d9482e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger {
                                archivedExecutionGraphStore,
                                jobManagerRunnerFactory,
                                fatalErrorHandler,
+                               null,
                                null);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 6dfb243..157fca7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
                        testingJobManagerRunnerFactory,
                        testingFatalErrorHandler,
                        null,
+                       null,
                        jobGraph,
                        executionMode);
        }

Reply via email to