[FLINK-9194][history] Add archiving routine to Dispatcher
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd374b83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd374b83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd374b83 Branch: refs/heads/master Commit: fd374b832f830adfa59b7f834b11c38080486f1c Parents: 6b6603f Author: zentol <[email protected]> Authored: Wed Apr 18 14:33:04 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon May 14 23:40:48 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 43 +++++++++++++++++++- .../runtime/dispatcher/MiniDispatcher.java | 5 ++- .../dispatcher/StandaloneDispatcher.java | 7 +++- .../runtime/entrypoint/ClusterEntrypoint.java | 7 +++- .../entrypoint/JobClusterEntrypoint.java | 5 ++- .../entrypoint/SessionClusterEntrypoint.java | 7 +++- .../flink/runtime/history/FsJobArchivist.java | 42 +++++++++++++++++++ .../flink/runtime/minicluster/MiniCluster.java | 3 +- .../runtime/webmonitor/WebMonitorEndpoint.java | 22 +++++++++- .../runtime/dispatcher/DispatcherTest.java | 1 + .../runtime/dispatcher/MiniDispatcherTest.java | 1 + 11 files changed, 132 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 58ffda3..82b9291 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.client.JobSubmissionException; @@ -32,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -124,6 +129,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Nullable protected final String restAddress; + @Nullable + private final JsonArchivist jsonArchivist; + + @Nullable + private final Path archivePath; + private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null); public Dispatcher( @@ -140,7 +151,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception { + @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist) throws Exception { super(rpcService, endpointId); this.configuration = Preconditions.checkNotNull(configuration); @@ -165,6 +177,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme this.restAddress = restAddress; + this.jsonArchivist = jsonArchivist; + + String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR); + if (configuredArchivePath != null) { + Path tmpArchivePath = null; + try { + tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri()); + } catch (Exception e) { + log.warn("Failed to validate specified archive directory in '{}'. " + + "Jobs will not be archived for the HistoryServer.", configuredArchivePath, e); + } + archivePath = tmpArchivePath; + } else { + archivePath = null; + } + this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore); this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory); @@ -621,6 +649,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme e); } + try { + if (jsonArchivist != null && archivePath != null) { + FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph)); + log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath); + } + } catch (IOException e) { + log.info( + "Could not archive completed job {}({}).", + archivedExecutionGraph.getJobName(), + archivedExecutionGraph.getJobID(), + e); + } + final JobID jobId = archivedExecutionGraph.getJobID(); removeJob(jobId, true); http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 4361e08..38e74fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; @@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher { JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist, JobGraph jobGraph, JobClusterEntrypoint.ExecutionMode executionMode) throws Exception { super( @@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher { archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, - restAddress); + restAddress, + jsonArchivist); this.executionMode = checkNotNull(executionMode); this.jobTerminationFuture = new CompletableFuture<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 52ac7a0..5c6a7ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import javax.annotation.Nullable; @@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher { ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception { + @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist) throws Exception { super( rpcService, endpointId, @@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher { archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, - restAddress); + restAddress, + jsonArchivist); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f823ea7..933add8 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; @@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, this, - webMonitorEndpoint.getRestBaseUrl()); + webMonitorEndpoint.getRestBaseUrl(), + webMonitorEndpoint); LOG.debug("Starting ResourceManager."); resourceManager.start(); @@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception; + @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist) throws Exception; protected abstract ResourceManager<?> createResourceManager( Configuration configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index ea7cbe2..2a5b8ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.FlinkException; @@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception { + @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist) throws Exception { final JobGraph jobGraph = retrieveJobGraph(configuration); @@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, restAddress, + jsonArchivist, jobGraph, executionMode); http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index fcab796..85446eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; @@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception { + @Nullable String restAddress, + @Nullable JsonArchivist jsonArchivist) throws Exception { // create the default dispatcher return new StandaloneDispatcher( @@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { archivedExecutionGraphStore, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, - restAddress); + restAddress, + jsonArchivist); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java index 3a6ea4f..1cfbf96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.history; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; @@ -67,7 +68,9 @@ public class FsJobArchivist { * @param graph graph to archive * @return path to where the archive was written, or null if no archive was created * @throws IOException + * @deprecated only kept for legacy reasons */ + @Deprecated public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { try { FileSystem fs = rootPath.getFileSystem(); @@ -100,6 +103,45 @@ public class FsJobArchivist { } /** + * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by + * {@link JobManagerOptions#ARCHIVE_DIR}. + * + * @param rootPath directory to which the archive should be written to + * @param jobId job id + * @param jsonToArchive collection of json-path pairs to that should be archived + * @return path to where the archive was written, or null if no archive was created + * @throws IOException + */ + public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, jobId.toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (ArchivedJson archive : jsonToArchive) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", jobId, path); + return path; + } catch (IOException e) { + LOG.error("Failed to archive job.", e); + throw e; + } + } + + /** * Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}. * * @param file archive to extract http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index a86aa2e..09f8bf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), - dispatcherRestEndpoint.getRestBaseUrl()); + dispatcherRestEndpoint.getRestBaseUrl(), + dispatcherRestEndpoint); dispatcher.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 5cb57d3..f9b2fa8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -113,6 +114,8 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.ExceptionUtils; @@ -126,6 +129,7 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -137,7 +141,7 @@ import java.util.concurrent.Executor; * * @param <T> type of the leader gateway */ -public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender { +public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist { protected final GatewayRetriever<? extends T> leaderRetriever; protected final Configuration clusterConfiguration; @@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp private boolean hasWebUI = false; + private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16); + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever<? extends T> leaderRetriever, @@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); + handlers.stream() + .map(tuple -> tuple.f1) + .filter(handler -> handler instanceof JsonArchivist) + .forEachOrdered(handler -> archivingHandlers.add((JsonArchivist) handler)); + return handlers; } @@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp fatalErrorHandler.onFatalError(exception); } + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size()); + for (JsonArchivist archivist : archivingHandlers) { + Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph); + archivedJson.addAll(subArchive); + } + return archivedJson; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 18a8ec1..d9482e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger { archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, + null, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 6dfb243..157fca7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger { testingJobManagerRunnerFactory, testingFatalErrorHandler, null, + null, jobGraph, executionMode); }
