[FLINK-9194] Introduce HistoryServerArchivist interface The HistoryServerArchivist interface encapsulates the archiving logic of an AccessExecutionGraph to the history server. Currently this means to generate the JSON responses for all possible HTTP requests and writing them to a target directory.
This closes #5902. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d734032e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d734032e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d734032e Branch: refs/heads/master Commit: d734032ee3f0de1c786f61aa52e7a054e7fca034 Parents: cd37feb Author: Till Rohrmann <[email protected]> Authored: Mon May 14 19:58:20 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue May 15 00:22:46 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 65 +++++++------------- .../dispatcher/HistoryServerArchivist.java | 55 +++++++++++++++++ .../JsonResponseHistoryServerArchivist.java | 56 +++++++++++++++++ .../runtime/dispatcher/MiniDispatcher.java | 5 +- .../dispatcher/StandaloneDispatcher.java | 5 +- .../dispatcher/VoidHistoryServerArchivist.java | 36 +++++++++++ .../runtime/entrypoint/ClusterEntrypoint.java | 8 ++- .../entrypoint/JobClusterEntrypoint.java | 6 +- .../entrypoint/SessionClusterEntrypoint.java | 6 +- .../flink/runtime/minicluster/MiniCluster.java | 5 +- .../runtime/dispatcher/DispatcherTest.java | 2 +- .../runtime/dispatcher/MiniDispatcherTest.java | 2 +- 12 files changed, 191 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 82b9291..4d30870 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.client.JobSubmissionException; @@ -34,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; -import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -64,8 +61,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -123,18 +118,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final JobManagerMetricGroup jobManagerMetricGroup; + private final HistoryServerArchivist historyServerArchivist; + @Nullable private final String metricQueryServicePath; @Nullable protected final String restAddress; - @Nullable - private final JsonArchivist jsonArchivist; - - @Nullable - private final Path archivePath; - private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null); public Dispatcher( @@ -152,7 +143,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist) throws Exception { + HistoryServerArchivist historyServerArchivist) throws Exception { super(rpcService, endpointId); this.configuration = Preconditions.checkNotNull(configuration); @@ -177,21 +168,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme this.restAddress = restAddress; - this.jsonArchivist = jsonArchivist; - - String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR); - if (configuredArchivePath != null) { - Path tmpArchivePath = null; - try { - tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri()); - } catch (Exception e) { - log.warn("Failed to validate specified archive directory in '{}'. " + - "Jobs will not be archived for the HistoryServer.", configuredArchivePath, e); - } - archivePath = tmpArchivePath; - } else { - archivePath = null; - } + this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist); this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore); @@ -639,6 +616,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()); + archiveExecutionGraph(archivedExecutionGraph); + + final JobID jobId = archivedExecutionGraph.getJobID(); + + removeJob(jobId, true); + } + + private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) { try { archivedExecutionGraphStore.put(archivedExecutionGraph); } catch (IOException e) { @@ -649,22 +634,18 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme e); } - try { - if (jsonArchivist != null && archivePath != null) { - FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph)); - log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath); - } - } catch (IOException e) { - log.info( - "Could not archive completed job {}({}).", - archivedExecutionGraph.getJobName(), - archivedExecutionGraph.getJobID(), - e); - } - - final JobID jobId = archivedExecutionGraph.getJobID(); + final CompletableFuture<Acknowledge> executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph); - removeJob(jobId, true); + executionGraphFuture.whenComplete( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + log.info( + "Could not archive completed job {}({}) to the history server.", + archivedExecutionGraph.getJobName(), + archivedExecutionGraph.getJobID(), + throwable); + } + }); } protected void jobNotFinished(JobID jobId) { http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java new file mode 100644 index 0000000..0030159 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import java.util.concurrent.CompletableFuture; + +/** + * Writer for an {@link AccessExecutionGraph}. + */ +public interface HistoryServerArchivist { + + /** + * Archives the given {@link AccessExecutionGraph} on the history server. + * + * @param executionGraph to store on the history server + * @return Future which is completed once the archiving has been completed. + */ + CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph); + + static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) { + final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR); + + if (configuredArchivePath != null) { + final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri()); + + return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath); + } else { + return VoidHistoryServerArchivist.INSTANCE; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java new file mode 100644 index 0000000..be58399 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation which archives an {@link AccessExecutionGraph} such that it stores + * the JSON requests for all possible history server requests. + */ +class JsonResponseHistoryServerArchivist implements HistoryServerArchivist { + + private final JsonArchivist jsonArchivist; + + private final Path archivePath; + + JsonResponseHistoryServerArchivist(JsonArchivist jsonArchivist, Path archivePath) { + this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist); + this.archivePath = Preconditions.checkNotNull(archivePath); + } + + @Override + public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) { + try { + FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph)); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 38e74fb..8b497b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; @@ -72,7 +71,7 @@ public class MiniDispatcher extends Dispatcher { JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist, + HistoryServerArchivist historyServerArchivist, JobGraph jobGraph, JobClusterEntrypoint.ExecutionMode executionMode) throws Exception { super( @@ -90,7 +89,7 @@ public class MiniDispatcher extends Dispatcher { jobManagerRunnerFactory, fatalErrorHandler, restAddress, - jsonArchivist); + historyServerArchivist); this.executionMode = checkNotNull(executionMode); this.jobTerminationFuture = new CompletableFuture<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 5c6a7ab..c39615c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import javax.annotation.Nullable; @@ -52,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher { JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist) throws Exception { + HistoryServerArchivist historyServerArchivist) throws Exception { super( rpcService, endpointId, @@ -68,6 +67,6 @@ public class StandaloneDispatcher extends Dispatcher { jobManagerRunnerFactory, fatalErrorHandler, restAddress, - jsonArchivist); + historyServerArchivist); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java new file mode 100644 index 0000000..2d34d83 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.Acknowledge; + +import java.util.concurrent.CompletableFuture; + +/** + * No-op implementation of the {@link HistoryServerArchivist}. + */ +public enum VoidHistoryServerArchivist implements HistoryServerArchivist { + INSTANCE { + @Override + public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 933add8..a267abb 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.MiniDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -64,7 +65,6 @@ import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; @@ -347,6 +347,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress()); + final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); + dispatcher = createDispatcher( configuration, rpcService, @@ -359,7 +361,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { archivedExecutionGraphStore, this, webMonitorEndpoint.getRestBaseUrl(), - webMonitorEndpoint); + historyServerArchivist); LOG.debug("Starting ResourceManager."); resourceManager.start(); @@ -660,7 +662,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist) throws Exception; + HistoryServerArchivist historyServerArchivist) throws Exception; protected abstract ResourceManager<?> createResourceManager( Configuration configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index 2a5b8ea..80a9da2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.MiniDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.FlinkException; @@ -102,7 +102,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist) throws Exception { + HistoryServerArchivist historyServerArchivist) throws Exception { final JobGraph jobGraph = retrieveJobGraph(configuration); @@ -124,7 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, restAddress, - jsonArchivist, + historyServerArchivist, jobGraph, executionMode); http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 85446eb..40eb8b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -40,7 +41,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; @@ -116,7 +116,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, - @Nullable JsonArchivist jsonArchivist) throws Exception { + HistoryServerArchivist historyServerArchivist) throws Exception { // create the default dispatcher return new StandaloneDispatcher( @@ -133,6 +133,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, restAddress, - jsonArchivist); + historyServerArchivist); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 09f8bf4..aca4fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -357,6 +358,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); + final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); + dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), @@ -371,7 +374,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), - dispatcherRestEndpoint); + historyServerArchivist); dispatcher.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index d9482e7..eff63e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -568,7 +568,7 @@ public class DispatcherTest extends TestLogger { jobManagerRunnerFactory, fatalErrorHandler, null, - null); + VoidHistoryServerArchivist.INSTANCE); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 157fca7..914531d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -258,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger { testingJobManagerRunnerFactory, testingFatalErrorHandler, null, - null, + VoidHistoryServerArchivist.INSTANCE, jobGraph, executionMode); }
