http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/gulpfile.js ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/gulpfile.js b/flink-runtime-web/web-dashboard/gulpfile.js index 9844116..0c868cd 100644 --- a/flink-runtime-web/web-dashboard/gulpfile.js +++ b/flink-runtime-web/web-dashboard/gulpfile.js @@ -111,7 +111,7 @@ gulp.task('vendor-scripts', function() { }); gulp.task('scripts', function() { - stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 'scripts/**/*.coffee'] ) + stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 'scripts/**/*.coffee', '!' + paths.src + 'scripts/index_hs.coffee'] ) .pipe(plumber()) .pipe(sourcemaps.init()) .pipe(coffee({ bare: true })) @@ -126,6 +126,22 @@ gulp.task('scripts', function() { stream.pipe(gulp.dest(paths.dest + 'js/')) }); +gulp.task('scripts_hs', function() { + stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 'scripts/**/*.coffee', '!' + paths.src + 'scripts/index.coffee'] ) + .pipe(plumber()) + .pipe(sourcemaps.init()) + .pipe(coffee({ bare: true })) + .pipe(ngAnnotate()) + .pipe(concat('index.js')) + .pipe(sourcemaps.write()); + + if (environment == 'production') { + stream.pipe(uglify()) + } + + stream.pipe(gulp.dest(paths.dest + 'js/hs')) +}); + gulp.task('html', function() { gulp.src(paths.src + 'index.jade') .pipe(plumber()) @@ -135,6 +151,15 @@ gulp.task('html', function() { .pipe(gulp.dest(paths.dest)) }); +gulp.task('html_hs', function() { + gulp.src(paths.src + 'index_hs.jade') + .pipe(plumber()) + .pipe(jade({ + pretty: true + })) + .pipe(gulp.dest(paths.dest)) +}); + gulp.task('partials', function() { gulp.src(paths.src + 'partials/**/*.jade') .pipe(plumber()) @@ -179,7 +204,7 @@ gulp.task('serve', serve({ })); gulp.task('vendor', ['vendor-styles', 'vendor-scripts']); -gulp.task('compile', ['html', 'partials','styles', 'scripts']); +gulp.task('compile', ['html', 'html_hs', 'partials','styles', 'scripts', 'scripts_hs']); gulp.task('default', ['fonts', 'assets', 'vendor', 'compile']); gulp.task('dev', ['set-development', 'default']);
http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java new file mode 100644 index 0000000..02ca3c4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** + * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by + * {@link JobManagerOptions#ARCHIVE_DIR}. + * + * @param rootPath directory to which the archive should be written to + * @param graph graph to archive + * @return path to where the archive was written, or null if no archive was created + * @throws IOException + */ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); + return path; + } catch (IOException e) { + LOG.error("Failed to archive job.", e); + throw e; + } + } + + /** + * Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}. + * + * @param file archive to extract + * @return collection of archived jsons + * @throws IOException if the file can't be opened, read or doesn't contain valid json + */ + public static Collection<ArchivedJson> getArchivedJsons(Path file) throws IOException { + try (FSDataInputStream input = file.getFileSystem().open(file); + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + IOUtils.copyBytes(input, output); + + JsonNode archive = mapper.readTree(output.toByteArray()); + + Collection<ArchivedJson> archives = new ArrayList<>(); + for (JsonNode archivePart : archive.get(ARCHIVE)) { + String path = archivePart.get(PATH).asText(); + String json = archivePart.get(JSON).asText(); + archives.add(new ArchivedJson(path, json)); + } + return archives; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 87df0d1..2baadb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import java.net.URI; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -33,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -143,6 +147,24 @@ public final class WebMonitorUtils { } } + public static JsonArchivist[] getJsonArchivists() { + try { + String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class); + Method method = clazz.getMethod("getJsonArchivists"); + JsonArchivist[] result = (JsonArchivist[]) method.invoke(null); + return result; + } catch (ClassNotFoundException e) { + LOG.error("Could not load web runtime monitor. " + + "Probably reason: flink-runtime-web is not in the classpath"); + LOG.debug("Caught exception", e); + return new JsonArchivist[0]; + } catch (Throwable t) { + LOG.error("Failed to retrieve archivers from web runtime monitor.", t); + return new JsonArchivist[0]; + } + } + public static Map<String, String> fromKeyValueJsonArray(String jsonString) { try { Map<String, String> map = new HashMap<>(); @@ -193,6 +215,32 @@ public final class WebMonitorUtils { } /** + * Checks and normalizes the given URI. This method first checks the validity of the + * URI (scheme and path are not null) and then normalizes the URI to a path. + * + * @param archiveDirUri The URI to check and normalize. + * @return A normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. + */ + public static Path validateAndNormalizeUri(URI archiveDirUri) { + final String scheme = archiveDirUri.getScheme(); + final String path = archiveDirUri.getPath(); + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI."); + } + if (path == null) { + throw new IllegalArgumentException("The path to store the job archive data in is null. " + + "Please specify a directory path for the archiving the job data."); + } + + return new Path(archiveDirUri); + } + + /** * Private constructor to prevent instantiation. */ private WebMonitorUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java index 22e011c..2e72f1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java @@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions; * resembling the REST API. */ public class ArchivedJson { + private final String path; private final String json; @@ -42,4 +43,19 @@ public class ArchivedJson { public String getJson() { return json; } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ArchivedJson) { + ArchivedJson other = (ArchivedJson) obj; + return this.path.equals(other.path) && this.json.equals(other.json); + } else { + return false; + } + } + + @Override + public String toString() { + return path +":" + json; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 73d093b..1e6d8d3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -30,7 +30,7 @@ import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID import org.apache.flink.api.common.time.Time import org.apache.flink.configuration._ -import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.metrics.groups.UnregisteredMetricsGroup import org.apache.flink.metrics.{Gauge, MetricGroup} @@ -2480,6 +2480,7 @@ object JobManager { RestartStrategyFactory, FiniteDuration, // timeout Int, // number of archived jobs + Option[Path], // archive path LeaderElectionService, SubmittedJobGraphStore, CheckpointRecoveryFactory, @@ -2498,6 +2499,22 @@ object JobManager { val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) + val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR) + + val archivePath = if (archiveDir != null) { + try { + Option.apply( + WebMonitorUtils.validateAndNormalizeUri(new Path(archiveDir).toUri)) + } catch { + case e: Exception => + LOG.warn(s"Failed to validate specified archive directory in '$archiveDir'. " + + "Jobs will not be archived for the HistoryServer.", e) + Option.empty + } + } else { + LOG.debug("No archive directory was configured. Jobs will not be archived.") + Option.empty + } var blobServer: BlobServer = null var instanceManager: InstanceManager = null @@ -2584,6 +2601,7 @@ object JobManager { restartStrategy, timeout, archiveCount, + archivePath, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, @@ -2656,6 +2674,7 @@ object JobManager { restartStrategy, timeout, archiveCount, + archivePath, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, @@ -2666,7 +2685,7 @@ object JobManager { ioExecutor, None) - val archiveProps = getArchiveProps(archiveClass, archiveCount) + val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath) // start the archiver with the given name, or without (avoid name conflicts) val archive: ActorRef = archiveActorName match { @@ -2705,8 +2724,11 @@ object JobManager { (jobManager, archive) } - def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = { - Props(archiveClass, archiveCount) + def getArchiveProps( + archiveClass: Class[_ <: MemoryArchivist], + archiveCount: Int, + archivePath: Option[Path]): Props = { + Props(archiveClass, archiveCount, archivePath) } def getJobManagerProps( http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 7f8fcd3..d83f2cd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -23,16 +23,21 @@ import java.util import akka.actor.ActorRef import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.Configuration +import org.apache.flink.core.fs.Path import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.webmonitor.WebMonitorUtils import org.apache.flink.runtime.{FlinkActor, LogMessages} import org.apache.flink.runtime.messages.webmonitor._ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph} +import org.apache.flink.runtime.history.FsJobArchivist import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ +import org.apache.flink.runtime.state.filesystem.FsStateBackend import scala.collection.mutable +import scala.concurrent.future /** * Actor which stores terminated Flink jobs. The number of stored Flink jobs is set by max_entries. @@ -54,9 +59,12 @@ import scala.collection.mutable * * - [[RequestJobCounts]] returns the number of finished, canceled, and failed jobs as a Tuple3 * - * @param max_entries Maximum number of stored Flink jobs + * @param maxEntries Maximum number of stored Flink jobs + * @param archivePath Optional path of the job archive directory */ -class MemoryArchivist(private val max_entries: Int) +class MemoryArchivist( + private val maxEntries: Int, + private val archivePath: Option[Path]) extends FlinkActor with LogMessages { @@ -96,6 +104,8 @@ class MemoryArchivist(private val max_entries: Int) case _ => } + archiveJsonFiles(graph) + trimHistory() case msg : InfoMessage => handleWebServerInfoMessage(msg, sender()) @@ -183,6 +193,27 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { + // a suspended job is expected to continue on another job manager, + // so we aren't archiving it yet. + if (archivePath.isDefined && graph.getState.isGloballyTerminalState) { + try { + val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri) + future { + try { + FsJobArchivist.archiveJob(p, graph) + } catch { + case e: Exception => + log.error("Failed to archive job.", e) + } + }(context.dispatcher) + } catch { + case e: Exception => + log.warn(s"Failed to create Path for $archivePath. Job will not be archived.", e) + } + } + } + // -------------------------------------------------------------------------- // Request Responses // -------------------------------------------------------------------------- @@ -218,7 +249,7 @@ class MemoryArchivist(private val max_entries: Int) * * if more than max_entries are in the queue. */ private def trimHistory(): Unit = { - while (graphs.size > max_entries) { + while (graphs.size > maxEntries) { // get first graph inserted val (jobID, value) = graphs.head graphs.remove(jobID) http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index adace0b..21e0d28 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -25,6 +25,7 @@ import akka.actor.{ActorRef, ActorSystem, Props} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions} +import org.apache.flink.core.fs.Path import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager @@ -43,7 +44,7 @@ import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} import org.apache.flink.runtime.metrics.MetricRegistry -import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration} +import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.util.EnvironmentInformation @@ -122,6 +123,7 @@ class LocalFlinkMiniCluster( restartStrategyFactory, timeout, archiveCount, + archivePath, leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, @@ -139,7 +141,8 @@ class LocalFlinkMiniCluster( val archive = system.actorOf( getArchiveProps( memoryArchivistClass, - archiveCount), + archiveCount, + archivePath), archiveName) system.actorOf( @@ -247,8 +250,11 @@ class LocalFlinkMiniCluster( // Props for the distributed components //------------------------------------------------------------------------------------------------ - def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = { - JobManager.getArchiveProps(archiveClass, archiveCount) + def getArchiveProps( + archiveClass: Class[_ <: MemoryArchivist], + archiveCount: Int, + arhivePath: Option[Path]): Props = { + JobManager.getArchiveProps(archiveClass, archiveCount, Option.empty) } def getJobManagerProps( http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 2ebc36e..dcf4722 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -33,6 +33,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobServer; @@ -174,7 +175,7 @@ public class JobManagerHARecoveryTest { InstanceManager instanceManager = new InstanceManager(); instanceManager.addInstanceListener(scheduler); - archive = system.actorOf(Props.create(MemoryArchivist.class, 10)); + archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty())); Props jobManagerProps = Props.create( TestingJobManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java new file mode 100644 index 0000000..c3883bc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import org.junit.Assert; +import org.junit.Test; + +public class ArchivedJsonTest { + + @Test + public void testEquals() { + ArchivedJson original = new ArchivedJson("path", "json"); + ArchivedJson twin = new ArchivedJson("path", "json"); + ArchivedJson identicalPath = new ArchivedJson("path", "hello"); + ArchivedJson identicalJson = new ArchivedJson("hello", "json"); + + Assert.assertEquals(original, original); + Assert.assertEquals(original, twin); + Assert.assertNotEquals(original, identicalPath); + Assert.assertNotEquals(original, identicalJson); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index 48a1ddd..3de5786 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.testingUtils +import org.apache.flink.core.fs.Path import org.apache.flink.runtime.jobmanager.MemoryArchivist import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph} @@ -25,7 +26,8 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{Executio * * @param maxEntries number of maximum number of archived jobs */ -class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) { +class TestingMemoryArchivist(maxEntries: Int, archivePath: Option[Path]) + extends MemoryArchivist(maxEntries, archivePath) { override def handleMessage: Receive = { handleTestingMessage orElse super.handleMessage
