[FLINK-9194] Introduce HistoryServerArchivist interface

The HistoryServerArchivist interface encapsulates the archiving logic of an
AccessExecutionGraph to the history server. Currently this means to generate
the JSON responses for all possible HTTP requests and writing them to a
target directory.

This closes #5902.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d734032e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d734032e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d734032e

Branch: refs/heads/master
Commit: d734032ee3f0de1c786f61aa52e7a054e7fca034
Parents: cd37feb
Author: Till Rohrmann <[email protected]>
Authored: Mon May 14 19:58:20 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 15 00:22:46 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 65 +++++++-------------
 .../dispatcher/HistoryServerArchivist.java      | 55 +++++++++++++++++
 .../JsonResponseHistoryServerArchivist.java     | 56 +++++++++++++++++
 .../runtime/dispatcher/MiniDispatcher.java      |  5 +-
 .../dispatcher/StandaloneDispatcher.java        |  5 +-
 .../dispatcher/VoidHistoryServerArchivist.java  | 36 +++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java   |  8 ++-
 .../entrypoint/JobClusterEntrypoint.java        |  6 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |  2 +-
 12 files changed, 191 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 82b9291..4d30870 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -34,7 +32,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,8 +61,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -123,18 +118,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private final JobManagerMetricGroup jobManagerMetricGroup;
 
+       private final HistoryServerArchivist historyServerArchivist;
+
        @Nullable
        private final String metricQueryServicePath;
 
        @Nullable
        protected final String restAddress;
 
-       @Nullable
-       private final JsonArchivist jsonArchivist;
-
-       @Nullable
-       private final Path archivePath;
-
        private CompletableFuture<Void> 
orphanedJobManagerRunnersTerminationFuture = 
CompletableFuture.completedFuture(null);
 
        public Dispatcher(
@@ -152,7 +143,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
-                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
                super(rpcService, endpointId);
 
                this.configuration = Preconditions.checkNotNull(configuration);
@@ -177,21 +168,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                this.restAddress = restAddress;
 
-               this.jsonArchivist = jsonArchivist;
-
-               String configuredArchivePath = 
configuration.getString(JobManagerOptions.ARCHIVE_DIR);
-               if (configuredArchivePath != null) {
-                       Path tmpArchivePath = null;
-                       try {
-                               tmpArchivePath = 
WebMonitorUtils.validateAndNormalizeUri(new 
Path(configuredArchivePath).toUri());
-                       } catch (Exception e) {
-                               log.warn("Failed to validate specified archive 
directory in '{}'. " +
-                                       "Jobs will not be archived for the 
HistoryServer.", configuredArchivePath, e);
-                       }
-                       archivePath = tmpArchivePath;
-               } else {
-                       archivePath = null;
-               }
+               this.historyServerArchivist = 
Preconditions.checkNotNull(historyServerArchivist);
 
                this.archivedExecutionGraphStore = 
Preconditions.checkNotNull(archivedExecutionGraphStore);
 
@@ -639,6 +616,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                log.info("Job {} reached globally terminal state {}.", 
archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
 
+               archiveExecutionGraph(archivedExecutionGraph);
+
+               final JobID jobId = archivedExecutionGraph.getJobID();
+
+               removeJob(jobId, true);
+       }
+
+       private void archiveExecutionGraph(ArchivedExecutionGraph 
archivedExecutionGraph) {
                try {
                        archivedExecutionGraphStore.put(archivedExecutionGraph);
                } catch (IOException e) {
@@ -649,22 +634,18 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                e);
                }
 
-               try {
-                       if (jsonArchivist != null && archivePath != null) {
-                               FsJobArchivist.archiveJob(archivePath, 
archivedExecutionGraph.getJobID(), 
jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
-                               log.info("Archived job {} to {}", 
archivedExecutionGraph.getJobID(), archivePath);
-                       }
-               } catch (IOException e) {
-                       log.info(
-                               "Could not archive completed job {}({}).",
-                               archivedExecutionGraph.getJobName(),
-                               archivedExecutionGraph.getJobID(),
-                               e);
-               }
-
-               final JobID jobId = archivedExecutionGraph.getJobID();
+               final CompletableFuture<Acknowledge> executionGraphFuture = 
historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
 
-               removeJob(jobId, true);
+               executionGraphFuture.whenComplete(
+                       (Acknowledge ignored, Throwable throwable) -> {
+                               if (throwable != null) {
+                                       log.info(
+                                               "Could not archive completed 
job {}({}) to the history server.",
+                                               
archivedExecutionGraph.getJobName(),
+                                               
archivedExecutionGraph.getJobID(),
+                                               throwable);
+                               }
+                       });
        }
 
        protected void jobNotFinished(JobID jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
new file mode 100644
index 0000000..0030159
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Writer for an {@link AccessExecutionGraph}.
+ */
+public interface HistoryServerArchivist {
+
+       /**
+        * Archives the given {@link AccessExecutionGraph} on the history 
server.
+        *
+        * @param executionGraph to store on the history server
+        * @return Future which is completed once the archiving has been 
completed.
+        */
+       CompletableFuture<Acknowledge> 
archiveExecutionGraph(AccessExecutionGraph executionGraph);
+
+       static HistoryServerArchivist 
createHistoryServerArchivist(Configuration configuration, JsonArchivist 
jsonArchivist) {
+               final String configuredArchivePath = 
configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+
+               if (configuredArchivePath != null) {
+                       final Path archivePath = 
WebMonitorUtils.validateAndNormalizeUri(new 
Path(configuredArchivePath).toUri());
+
+                       return new 
JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
+               } else {
+                       return VoidHistoryServerArchivist.INSTANCE;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
new file mode 100644
index 0000000..be58399
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation which archives an {@link AccessExecutionGraph} such that it 
stores
+ * the JSON requests for all possible history server requests.
+ */
+class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
+
+       private final JsonArchivist jsonArchivist;
+
+       private final Path archivePath;
+
+       JsonResponseHistoryServerArchivist(JsonArchivist jsonArchivist, Path 
archivePath) {
+               this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist);
+               this.archivePath = Preconditions.checkNotNull(archivePath);
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> 
archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+               try {
+                       FsJobArchivist.archiveJob(archivePath, 
executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 38e74fb..8b497b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -72,7 +71,7 @@ public class MiniDispatcher extends Dispatcher {
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
-                       @Nullable JsonArchivist jsonArchivist,
+                       HistoryServerArchivist historyServerArchivist,
                        JobGraph jobGraph,
                        JobClusterEntrypoint.ExecutionMode executionMode) 
throws Exception {
                super(
@@ -90,7 +89,7 @@ public class MiniDispatcher extends Dispatcher {
                        jobManagerRunnerFactory,
                        fatalErrorHandler,
                        restAddress,
-                       jsonArchivist);
+                       historyServerArchivist);
 
                this.executionMode = checkNotNull(executionMode);
                this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5c6a7ab..c39615c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -52,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
-                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
                super(
                        rpcService,
                        endpointId,
@@ -68,6 +67,6 @@ public class StandaloneDispatcher extends Dispatcher {
                        jobManagerRunnerFactory,
                        fatalErrorHandler,
                        restAddress,
-                       jsonArchivist);
+                       historyServerArchivist);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
new file mode 100644
index 0000000..2d34d83
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * No-op implementation of the {@link HistoryServerArchivist}.
+ */
+public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
+       INSTANCE {
+               @Override
+               public CompletableFuture<Acknowledge> 
archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 933add8..a267abb 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -347,6 +347,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
 
                        jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
 
+                       final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
+
                        dispatcher = createDispatcher(
                                configuration,
                                rpcService,
@@ -359,7 +361,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                archivedExecutionGraphStore,
                                this,
                                webMonitorEndpoint.getRestBaseUrl(),
-                               webMonitorEndpoint);
+                               historyServerArchivist);
 
                        LOG.debug("Starting ResourceManager.");
                        resourceManager.start();
@@ -660,7 +662,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                ArchivedExecutionGraphStore archivedExecutionGraphStore,
                FatalErrorHandler fatalErrorHandler,
                @Nullable String restAddress,
-               @Nullable JsonArchivist jsonArchivist) throws Exception;
+               HistoryServerArchivist historyServerArchivist) throws Exception;
 
        protected abstract ResourceManager<?> createResourceManager(
                Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2a5b8ea..80a9da2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -39,7 +40,6 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -102,7 +102,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
-                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
 
                final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -124,7 +124,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,
                        restAddress,
-                       jsonArchivist,
+                       historyServerArchivist,
                        jobGraph,
                        executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 85446eb..40eb8b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,7 +41,6 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -116,7 +116,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress,
-                       @Nullable JsonArchivist jsonArchivist) throws Exception 
{
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
 
                // create the default dispatcher
                return new StandaloneDispatcher(
@@ -133,6 +133,6 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,
                        restAddress,
-                       jsonArchivist);
+                       historyServerArchivist);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 09f8bf4..aca4fdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -357,6 +358,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                                this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
 
+                               final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
dispatcherRestEndpoint);
+
                                dispatcher = new StandaloneDispatcher(
                                        jobManagerRpcService,
                                        Dispatcher.DISPATCHER_NAME + 
UUID.randomUUID(),
@@ -371,7 +374,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                                        new ShutDownFatalErrorHandler(),
                                        dispatcherRestEndpoint.getRestBaseUrl(),
-                                       dispatcherRestEndpoint);
+                                       historyServerArchivist);
 
                                dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d9482e7..eff63e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -568,7 +568,7 @@ public class DispatcherTest extends TestLogger {
                                jobManagerRunnerFactory,
                                fatalErrorHandler,
                                null,
-                               null);
+                               VoidHistoryServerArchivist.INSTANCE);
                }
 
                @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 157fca7..914531d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
                        testingJobManagerRunnerFactory,
                        testingFatalErrorHandler,
                        null,
-                       null,
+                       VoidHistoryServerArchivist.INSTANCE,
                        jobGraph,
                        executionMode);
        }

Reply via email to