http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/gulpfile.js
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/gulpfile.js 
b/flink-runtime-web/web-dashboard/gulpfile.js
index 9844116..0c868cd 100644
--- a/flink-runtime-web/web-dashboard/gulpfile.js
+++ b/flink-runtime-web/web-dashboard/gulpfile.js
@@ -111,7 +111,7 @@ gulp.task('vendor-scripts', function() {
  });
 
 gulp.task('scripts', function() {
-  stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 
'scripts/**/*.coffee'] )
+  stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 
'scripts/**/*.coffee', '!' + paths.src + 'scripts/index_hs.coffee'] )
     .pipe(plumber())
     .pipe(sourcemaps.init())
     .pipe(coffee({ bare: true }))
@@ -126,6 +126,22 @@ gulp.task('scripts', function() {
   stream.pipe(gulp.dest(paths.dest + 'js/'))
 });
 
+gulp.task('scripts_hs', function() {
+  stream = gulp.src([ paths.src + 'scripts/config.js', paths.src + 
'scripts/**/*.coffee', '!' + paths.src + 'scripts/index.coffee'] )
+    .pipe(plumber())
+    .pipe(sourcemaps.init())
+    .pipe(coffee({ bare: true }))
+    .pipe(ngAnnotate())
+    .pipe(concat('index.js'))
+    .pipe(sourcemaps.write());
+
+  if (environment == 'production') {
+    stream.pipe(uglify())
+  }
+
+  stream.pipe(gulp.dest(paths.dest + 'js/hs'))
+});
+
 gulp.task('html', function() {
   gulp.src(paths.src + 'index.jade')
     .pipe(plumber())
@@ -135,6 +151,15 @@ gulp.task('html', function() {
     .pipe(gulp.dest(paths.dest))
 });
 
+gulp.task('html_hs', function() {
+  gulp.src(paths.src + 'index_hs.jade')
+    .pipe(plumber())
+    .pipe(jade({
+      pretty: true
+    }))
+    .pipe(gulp.dest(paths.dest))
+});
+
 gulp.task('partials', function() {
   gulp.src(paths.src + 'partials/**/*.jade')
     .pipe(plumber())
@@ -179,7 +204,7 @@ gulp.task('serve', serve({
 }));
 
 gulp.task('vendor', ['vendor-styles', 'vendor-scripts']);
-gulp.task('compile', ['html', 'partials','styles', 'scripts']);
+gulp.task('compile', ['html', 'html_hs', 'partials','styles', 'scripts', 
'scripts_hs']);
 
 gulp.task('default', ['fonts', 'assets', 'vendor', 'compile']);
 gulp.task('dev', ['set-development', 'default']);

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
new file mode 100644
index 0000000..02ca3c4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.history;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility class for writing an archive file to a {@link FileSystem} and 
reading it back.
+ */
+public class FsJobArchivist {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FsJobArchivist.class);
+       private static final JsonFactory jacksonFactory = new JsonFactory();
+       private static final ObjectMapper mapper = new ObjectMapper();
+
+       private static final String ARCHIVE = "archive";
+       private static final String PATH = "path";
+       private static final String JSON = "json";
+
+       private FsJobArchivist() {
+       }
+
+       /**
+        * Writes the given {@link AccessExecutionGraph} to the {@link 
FileSystem} pointed to by
+        * {@link JobManagerOptions#ARCHIVE_DIR}.
+        *
+        * @param rootPath directory to which the archive should be written to
+        * @param graph  graph to archive
+        * @return path to where the archive was written, or null if no archive 
was created
+        * @throws IOException
+        */
+       public static Path archiveJob(Path rootPath, AccessExecutionGraph 
graph) throws IOException {
+               try {
+                       FileSystem fs = rootPath.getFileSystem();
+                       Path path = new Path(rootPath, 
graph.getJobID().toString());
+                       OutputStream out = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+
+                       try (JsonGenerator gen = 
jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+                               gen.writeStartObject();
+                               gen.writeArrayFieldStart(ARCHIVE);
+                               for (JsonArchivist archiver : 
WebMonitorUtils.getJsonArchivists()) {
+                                       for (ArchivedJson archive : 
archiver.archiveJsonWithPath(graph)) {
+                                               gen.writeStartObject();
+                                               gen.writeStringField(PATH, 
archive.getPath());
+                                               gen.writeStringField(JSON, 
archive.getJson());
+                                               gen.writeEndObject();
+                                       }
+                               }
+                               gen.writeEndArray();
+                               gen.writeEndObject();
+                       } catch (Exception e) {
+                               fs.delete(path, false);
+                               throw e;
+                       }
+                       LOG.info("Job {} has been archived at {}.", 
graph.getJobID(), path);
+                       return path;
+               } catch (IOException e) {
+                       LOG.error("Failed to archive job.", e);
+                       throw e;
+               }
+       }
+
+       /**
+        * Reads the given archive file and returns a {@link Collection} of 
contained {@link ArchivedJson}.
+        *
+        * @param file archive to extract
+        * @return collection of archived jsons
+        * @throws IOException if the file can't be opened, read or doesn't 
contain valid json
+        */
+       public static Collection<ArchivedJson> getArchivedJsons(Path file) 
throws IOException {
+               try (FSDataInputStream input = file.getFileSystem().open(file);
+                       ByteArrayOutputStream output = new 
ByteArrayOutputStream()) {
+                       IOUtils.copyBytes(input, output);
+
+                       JsonNode archive = 
mapper.readTree(output.toByteArray());
+
+                       Collection<ArchivedJson> archives = new ArrayList<>();
+                       for (JsonNode archivePart : archive.get(ARCHIVE)) {
+                               String path = archivePart.get(PATH).asText();
+                               String json = archivePart.get(JSON).asText();
+                               archives.add(new ArchivedJson(path, json));
+                       }
+                       return archives;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 87df0d1..2baadb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 
+import java.net.URI;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -33,6 +35,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -143,6 +147,24 @@ public final class WebMonitorUtils {
                }
        }
 
+       public static JsonArchivist[] getJsonArchivists() {
+               try {
+                       String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+                       Class<? extends WebMonitor> clazz = 
Class.forName(classname).asSubclass(WebMonitor.class);
+                       Method method = clazz.getMethod("getJsonArchivists");
+                       JsonArchivist[] result = (JsonArchivist[]) 
method.invoke(null);
+                       return result;
+               } catch (ClassNotFoundException e) {
+                       LOG.error("Could not load web runtime monitor. " +
+                               "Probably reason: flink-runtime-web is not in 
the classpath");
+                       LOG.debug("Caught exception", e);
+                       return new JsonArchivist[0];
+               } catch (Throwable t) {
+                       LOG.error("Failed to retrieve archivers from web 
runtime monitor.", t);
+                       return new JsonArchivist[0];
+               }
+       }
+
        public static Map<String, String> fromKeyValueJsonArray(String 
jsonString) {
                try {
                        Map<String, String> map = new HashMap<>();
@@ -193,6 +215,32 @@ public final class WebMonitorUtils {
        }
 
        /**
+        * Checks and normalizes the given URI. This method first checks the 
validity of the
+        * URI (scheme and path are not null) and then normalizes the URI to a 
path.
+        *
+        * @param archiveDirUri The URI to check and normalize.
+        * @return A normalized URI as a Path.
+        *
+        * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path. 
+        */
+       public static Path validateAndNormalizeUri(URI archiveDirUri) {
+               final String scheme = archiveDirUri.getScheme();
+               final String path = archiveDirUri.getPath();
+
+               // some validity checks
+               if (scheme == null) {
+                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
+                               "Please specify the file system scheme 
explicitly in the URI.");
+               }
+               if (path == null) {
+                       throw new IllegalArgumentException("The path to store 
the job archive data in is null. " +
+                               "Please specify a directory path for the 
archiving the job data.");
+               }
+
+               return new Path(archiveDirUri);
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private WebMonitorUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index 22e011c..2e72f1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions;
  * resembling the REST API.
  */
 public class ArchivedJson {
+
        private final String path;
        private final String json;
        
@@ -42,4 +43,19 @@ public class ArchivedJson {
        public String getJson() {
                return json;
        }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ArchivedJson) {
+                       ArchivedJson other = (ArchivedJson) obj;
+                       return this.path.equals(other.path) && 
this.json.equals(other.json);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return path +":" + json;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 73d093b..1e6d8d3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -30,7 +30,7 @@ import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
-import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
@@ -2480,6 +2480,7 @@ object JobManager {
     RestartStrategyFactory,
     FiniteDuration, // timeout
     Int, // number of archived jobs
+    Option[Path], // archive path
     LeaderElectionService,
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
@@ -2498,6 +2499,22 @@ object JobManager {
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
 
+    val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
+
+    val archivePath = if (archiveDir != null) {
+      try {
+        Option.apply(
+          WebMonitorUtils.validateAndNormalizeUri(new Path(archiveDir).toUri))
+      } catch {
+        case e: Exception =>
+          LOG.warn(s"Failed to validate specified archive directory in 
'$archiveDir'. " +
+            "Jobs will not be archived for the HistoryServer.", e)
+          Option.empty
+      }
+    } else {
+      LOG.debug("No archive directory was configured. Jobs will not be 
archived.")
+      Option.empty
+    }
 
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
@@ -2584,6 +2601,7 @@ object JobManager {
       restartStrategy,
       timeout,
       archiveCount,
+      archivePath,
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
@@ -2656,6 +2674,7 @@ object JobManager {
     restartStrategy,
     timeout,
     archiveCount,
+    archivePath,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
@@ -2666,7 +2685,7 @@ object JobManager {
       ioExecutor,
       None)
 
-    val archiveProps = getArchiveProps(archiveClass, archiveCount)
+    val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
     // start the archiver with the given name, or without (avoid name 
conflicts)
     val archive: ActorRef = archiveActorName match {
@@ -2705,8 +2724,11 @@ object JobManager {
     (jobManager, archive)
   }
 
-  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: 
Int): Props = {
-    Props(archiveClass, archiveCount)
+  def getArchiveProps(
+      archiveClass: Class[_ <: MemoryArchivist],
+      archiveCount: Int,
+      archivePath: Option[Path]): Props = {
+    Props(archiveClass, archiveCount, archivePath)
   }
 
   def getJobManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 7f8fcd3..d83f2cd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -23,16 +23,21 @@ import java.util
 import akka.actor.ActorRef
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, 
ExecutionGraph}
+import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
 
 import scala.collection.mutable
+import scala.concurrent.future
 
 /**
  * Actor which stores terminated Flink jobs. The number of stored Flink jobs 
is set by max_entries.
@@ -54,9 +59,12 @@ import scala.collection.mutable
  *
  *  - [[RequestJobCounts]] returns the number of finished, canceled, and 
failed jobs as a Tuple3
  *
- * @param max_entries Maximum number of stored Flink jobs
+ * @param maxEntries Maximum number of stored Flink jobs
+ * @param archivePath Optional path of the job archive directory
  */
-class MemoryArchivist(private val max_entries: Int)
+class MemoryArchivist(
+    private val maxEntries: Int,
+    private val archivePath: Option[Path])
   extends FlinkActor
   with LogMessages {
 
@@ -96,6 +104,8 @@ class MemoryArchivist(private val max_entries: Int)
         case _ =>
       }
 
+      archiveJsonFiles(graph)
+
       trimHistory()
 
     case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())
@@ -183,6 +193,27 @@ class MemoryArchivist(private val max_entries: Int)
     }
   }
 
+  private def archiveJsonFiles(graph: ArchivedExecutionGraph) {
+    // a suspended job is expected to continue on another job manager,
+    // so we aren't archiving it yet.
+    if (archivePath.isDefined && graph.getState.isGloballyTerminalState) {
+      try {
+        val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri)
+        future {
+          try {
+            FsJobArchivist.archiveJob(p, graph)
+          } catch {
+            case e: Exception =>
+              log.error("Failed to archive job.", e)
+          }
+        }(context.dispatcher)
+      } catch {
+        case e: Exception =>
+          log.warn(s"Failed to create Path for $archivePath. Job will not be 
archived.", e)
+      }
+    }
+  }
+
   // --------------------------------------------------------------------------
   //  Request Responses
   // --------------------------------------------------------------------------
@@ -218,7 +249,7 @@ class MemoryArchivist(private val max_entries: Int)
    * * if more than max_entries are in the queue.
    */
   private def trimHistory(): Unit = {
-    while (graphs.size > max_entries) {
+    while (graphs.size > maxEntries) {
       // get first graph inserted
       val (jobID, value) = graphs.head
       graphs.remove(jobID)

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index adace0b..21e0d28 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -25,6 +25,7 @@ import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
QueryableStateOptions}
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
@@ -43,7 +44,7 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, 
TaskManagerServicesConfiguration, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, 
TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 
@@ -122,6 +123,7 @@ class LocalFlinkMiniCluster(
     restartStrategyFactory,
     timeout,
     archiveCount,
+    archivePath,
     leaderElectionService,
     submittedJobGraphStore,
     checkpointRecoveryFactory,
@@ -139,7 +141,8 @@ class LocalFlinkMiniCluster(
     val archive = system.actorOf(
       getArchiveProps(
         memoryArchivistClass,
-        archiveCount),
+        archiveCount,
+        archivePath),
       archiveName)
 
     system.actorOf(
@@ -247,8 +250,11 @@ class LocalFlinkMiniCluster(
   // Props for the distributed components
   
//------------------------------------------------------------------------------------------------
 
-  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: 
Int): Props = {
-    JobManager.getArchiveProps(archiveClass, archiveCount)
+  def getArchiveProps(
+      archiveClass: Class[_ <: MemoryArchivist],
+      archiveCount: Int,
+      arhivePath: Option[Path]): Props = {
+    JobManager.getArchiveProps(archiveClass, archiveCount, Option.empty)
   }
 
   def getJobManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 2ebc36e..dcf4722 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -174,7 +175,7 @@ public class JobManagerHARecoveryTest {
                        InstanceManager instanceManager = new InstanceManager();
                        instanceManager.addInstanceListener(scheduler);
 
-                       archive = 
system.actorOf(Props.create(MemoryArchivist.class, 10));
+                       archive = 
system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, 
Option.<Path>empty()));
 
                        Props jobManagerProps = Props.create(
                                TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
new file mode 100644
index 0000000..c3883bc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArchivedJsonTest {
+
+       @Test
+       public void testEquals() {
+               ArchivedJson original = new ArchivedJson("path", "json");
+               ArchivedJson twin = new ArchivedJson("path", "json");
+               ArchivedJson identicalPath = new ArchivedJson("path", "hello");
+               ArchivedJson identicalJson = new ArchivedJson("hello", "json");
+
+               Assert.assertEquals(original, original);
+               Assert.assertEquals(original, twin);
+               Assert.assertNotEquals(original, identicalPath);
+               Assert.assertNotEquals(original, identicalJson);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 48a1ddd..3de5786 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound,
 ExecutionGraphNotFound, RequestExecutionGraph}
 
@@ -25,7 +26,8 @@ import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{Executio
   *
   * @param maxEntries number of maximum number of archived jobs
   */
-class TestingMemoryArchivist(maxEntries: Int) extends 
MemoryArchivist(maxEntries) {
+class TestingMemoryArchivist(maxEntries: Int, archivePath: Option[Path])
+  extends MemoryArchivist(maxEntries, archivePath) {
 
   override def handleMessage: Receive = {
     handleTestingMessage orElse super.handleMessage

Reply via email to