[FLINK-1579] [webui] Implement History Server

Archives jobs to a file system directory and adds a history server
that can display the archived jobs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cab0cb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cab0cb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cab0cb2

Branch: refs/heads/master
Commit: 2cab0cb2645c78d95c9586a2b84a2d8768a391c2
Parents: 8e85f01
Author: zentol <[email protected]>
Authored: Mon Mar 20 18:41:54 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Mar 22 15:45:00 2017 +0100

----------------------------------------------------------------------
 docs/monitoring/back_pressure.md                |   2 +-
 docs/monitoring/checkpoint_monitoring.md        |   2 +-
 docs/monitoring/debugging_classloading.md       |   2 +-
 docs/monitoring/debugging_event_time.md         |   2 +-
 docs/monitoring/historyserver.md                |  98 ++++++
 docs/monitoring/large_state_tuning.md           |   2 +-
 docs/setup/config.md                            |  17 ++
 .../configuration/HistoryServerOptions.java     |  82 +++++
 .../flink/configuration/JobManagerOptions.java  |  69 +++--
 .../src/main/flink-bin/bin/flink-daemon.sh      |   6 +-
 .../src/main/flink-bin/bin/historyserver.sh     |  34 +++
 flink-dist/src/main/resources/flink-conf.yaml   |  21 ++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   6 +-
 .../files/StaticFileServerHandler.java          |  12 +-
 .../webmonitor/history/HistoryServer.java       | 302 +++++++++++++++++++
 .../history/HistoryServerArchiveFetcher.java    | 252 ++++++++++++++++
 .../HistoryServerStaticFileServerHandler.java   | 249 +++++++++++++++
 .../runtime/webmonitor/WebMonitorUtilsTest.java |  36 +++
 .../webmonitor/history/FsJobArchivistTest.java  |  84 ++++++
 ...istoryServerStaticFileServerHandlerTest.java |  77 +++++
 .../webmonitor/history/HistoryServerTest.java   | 122 ++++++++
 .../web-dashboard/app/index_hs.jade             |  60 ++++
 .../web-dashboard/app/scripts/index_hs.coffee   | 208 +++++++++++++
 flink-runtime-web/web-dashboard/gulpfile.js     |  29 +-
 .../flink/runtime/history/FsJobArchivist.java   | 122 ++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |  48 +++
 .../webmonitor/history/ArchivedJson.java        |  16 +
 .../flink/runtime/jobmanager/JobManager.scala   |  30 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |  37 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  14 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   3 +-
 .../webmonitor/history/ArchivedJsonTest.java    |  37 +++
 .../testingUtils/TestingMemoryArchivist.scala   |   4 +-
 33 files changed, 2024 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/back_pressure.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/back_pressure.md b/docs/monitoring/back_pressure.md
index 4a4bd22..f047066 100644
--- a/docs/monitoring/back_pressure.md
+++ b/docs/monitoring/back_pressure.md
@@ -1,7 +1,7 @@
 ---
 title: "Monitoring Back Pressure"
 nav-parent_id: monitoring
-nav-pos: 4
+nav-pos: 5
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/checkpoint_monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/checkpoint_monitoring.md 
b/docs/monitoring/checkpoint_monitoring.md
index 5a4dba7..7cf06d1 100644
--- a/docs/monitoring/checkpoint_monitoring.md
+++ b/docs/monitoring/checkpoint_monitoring.md
@@ -1,7 +1,7 @@
 ---
 title: "Monitoring Checkpointing"
 nav-parent_id: monitoring
-nav-pos: 3
+nav-pos: 4
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/debugging_classloading.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/debugging_classloading.md 
b/docs/monitoring/debugging_classloading.md
index 85ee9bb..8c91e0f 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -1,7 +1,7 @@
 ---
 title: "Debugging Classloading"
 nav-parent_id: monitoring
-nav-pos: 13
+nav-pos: 14
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/debugging_event_time.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/debugging_event_time.md 
b/docs/monitoring/debugging_event_time.md
index eaf0d5c..edc7dd0 100644
--- a/docs/monitoring/debugging_event_time.md
+++ b/docs/monitoring/debugging_event_time.md
@@ -1,7 +1,7 @@
 ---
 title: "Debugging Windows & Event Time"
 nav-parent_id: monitoring
-nav-pos: 12
+nav-pos: 13
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/historyserver.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md
new file mode 100644
index 0000000..fd957f2
--- /dev/null
+++ b/docs/monitoring/historyserver.md
@@ -0,0 +1,98 @@
+---
+title: "History Server"
+nav-parent_id: monitoring
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+
+Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Overview
+
+The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
+
+You start and stop the HistoryServer via its corresponding startup script:
+
+```sh
+# Start or stop the HistoryServer
+bin/historyserver.sh (start|stop)
+```
+
+By default, this server binds to `localhost` and listens at port `8082`.
+
+Currently, you can only run it as a standalone process.
+
+## Configuration
+
+The configuration keys `jobmanager.archive.fs.dir` and 
`historyserver.archive.fs.refresh-interval` need to be adjusted for archiving 
and displaying archived jobs.
+
+**JobManager**
+
+The archiving of completed jobs happens on the JobManager, which uploads the 
archived job information to a file system directory. You can configure the 
directory to archive completed jobs in `flink-conf.yaml` by setting a directory 
via `jobmanager.archive.fs.dir`.
+
+```sh
+# Directory to upload completed job information
+jobmanager.archive.fs.dir: hdfs:///completed-jobs
+```
+
+**HistoryServer**
+
+The HistoryServer can be configured to monitor a comma-separated list of 
directories in via `historyserver.archive.fs.dir`. The configured directories 
are regularly polled for new archives; the polling interval can be configured 
via `historyserver.archive.fs.refresh-interval`.
+
+```sh
+# Monitor the following directories for completed jobs
+historyserver.archive.fs.dir: hdfs:///completed-jobs
+
+# Refresh every 10 seconds
+historyserver.archive.fs.refresh-interval: 10000
+```
+
+The contained archives are downloaded and cached in the local filesystem. The 
local directory for this is configured via `historyserver.web.tmpdir`.
+
+Check out the configuration page for a [complete list of configuration 
options]({{ site.baseurl }}/setup/config.html#history-server).
+
+## Available Requests
+
+Below is a list of available requests, with a sample JSON response. All 
requests are of the sample form `http://hostname:8082/jobs`, below we list only 
the *path* part of the URLs.
+
+Values in angle brackets are variables, for example 
`http://hostname:port/jobs/<jobid>/exceptions` will have to requested for 
example as 
`http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`.
+
+  - `/config`
+  - `/jobs`
+  - `/joboverview`
+  - `/jobs/<jobid>`
+  - `/jobs/<jobid>/vertices`
+  - `/jobs/<jobid>/config`
+  - `/jobs/<jobid>/exceptions`
+  - `/jobs/<jobid>/accumulators`
+  - `/jobs/<jobid>/vertices/<vertexid>`
+  - `/jobs/<jobid>/vertices/<vertexid>/subtasktimes`
+  - `/jobs/<jobid>/vertices/<vertexid>/taskmanagers`
+  - `/jobs/<jobid>/vertices/<vertexid>/accumulators`
+  - `/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators`
+  - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>`
+  - 
`/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>`
+  - 
`/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators`
+  - `/jobs/<jobid>/plan`

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/large_state_tuning.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/large_state_tuning.md 
b/docs/monitoring/large_state_tuning.md
index 78a5cb4..a520106 100644
--- a/docs/monitoring/large_state_tuning.md
+++ b/docs/monitoring/large_state_tuning.md
@@ -1,7 +1,7 @@
 ---
 title: "Debugging and Tuning Checkpoints and Large State"
 nav-parent_id: monitoring
-nav-pos: 11
+nav-pos: 12
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index d9d5f15..6a5c1ff 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -582,6 +582,23 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `metrics.latency.history-size`: (Default: 128) Defines the number of 
measured latencies to maintain at each operator
 
+### History Server
+
+You have to configure `jobmanager.archive.fs.dir` in order to archive 
terminated jobs and add it to the list of monitored directories via 
`historyserver.archive.fs.dir` if you want to display them via the 
HistoryServer's web frontend.
+
+- `jobmanager.archive.fs.dir`: Directory to upload information about 
terminated jobs to. You have to add this directory to the list of monitored 
directories of the history server via `historyserver.archive.fs.dir`.
+
+- `historyserver.archive.fs.dir`: Comma separated list of directories to fetch 
archived jobs from. The history server will monitor these directories for 
archived jobs. You can configure the JobManager to archive jobs to a directory 
via `jobmanager.archive.fs.dir`.
+
+- `historyserver.archive.fs.refresh-interval`: Interval in milliseconds for 
refreshing the archived job directories (DEFAULT: `10000`).
+
+- `historyserver.web.tmpdir`: This configuration parameter allows defining the 
Flink web directory to be used by the history server web interface. The web 
interface will copy its static files into the directory (DEFAULT: local system 
temporary directory).
+
+- `historyserver.web.address`: Address of the HistoryServer's web interface 
(DEFAULT: `anyLocalAddress()`).
+
+- `historyserver.web.port`: Port of the HistoryServers's web interface 
(DEFAULT: `8082`).
+
+- `historyserver.web.ssl.enabled`: Enable HTTPs access to the HistoryServer 
web frontend. This is applicable only when the global SSL flag 
security.ssl.enabled is set to true (DEFAULT: `false`).
 
 ## Background
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
new file mode 100644
index 0000000..ebe4f2b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to the HistoryServer.
+ */
+@PublicEvolving
+public class HistoryServerOptions {
+
+       /**
+        * The interval at which the HistoryServer polls {@link 
HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for new archives.
+        */
+       public static final ConfigOption<Long> 
HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL =
+               key("historyserver.archive.fs.refresh-interval")
+                       .defaultValue(10000L);
+
+       /**
+        * Comma-separated list of directories which the HistoryServer polls 
for new archives.
+        */
+       public static final ConfigOption<String> HISTORY_SERVER_ARCHIVE_DIRS =
+               key("historyserver.archive.fs.dir")
+                       .noDefaultValue();
+
+       /**
+        * The local directory used by the HistoryServer web-frontend.
+        */
+       public static final ConfigOption<String> HISTORY_SERVER_WEB_DIR =
+               key("historyserver.web.tmpdir")
+                       .noDefaultValue();
+
+       /**
+        * The address under which the HistoryServer web-frontend is accessible.
+        */
+       public static final ConfigOption<String> HISTORY_SERVER_WEB_ADDRESS =
+               key("historyserver.web.address")
+                       .noDefaultValue();
+
+       /**
+        * The port under which the HistoryServer web-frontend is accessible.
+        */
+       public static final ConfigOption<Integer> HISTORY_SERVER_WEB_PORT =
+               key("historyserver.web.port")
+                       .defaultValue(8082);
+
+       /**
+        * The refresh interval for the HistoryServer web-frontend in 
milliseconds.
+        */
+       public static final ConfigOption<Long> 
HISTORY_SERVER_WEB_REFRESH_INTERVAL =
+               key("historyserver.web.refresh-interval")
+                       .defaultValue(10000L);
+
+       /**
+        * Enables/Disables SSL support for the HistoryServer web-frontend. 
Only relevant if
+        * {@link ConfigConstants#SECURITY_SSL_ENABLED} is enabled.
+        */
+       public static final ConfigOption<Boolean> 
HISTORY_SERVER_WEB_SSL_ENABLED =
+               key("historyserver.web.ssl.enabled")
+                       .defaultValue(false);
+
+       private HistoryServerOptions() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 6f5efe6..d9c9d18 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.configuration;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
@@ -36,8 +38,8 @@ public class JobManagerOptions {
         * leader-election service (like ZooKeeper) is used to elect and 
discover the JobManager
         * leader from potentially multiple standby JobManagers.
         */
-       public static final ConfigOption<String> ADDRESS = ConfigOptions
-               .key("jobmanager.rpc.address")
+       public static final ConfigOption<String> ADDRESS =
+               key("jobmanager.rpc.address")
                .noDefaultValue();
 
        /**
@@ -51,15 +53,15 @@ public class JobManagerOptions {
         * leader-election service (like ZooKeeper) is used to elect and 
discover the JobManager
         * leader from potentially multiple standby JobManagers.
         */
-       public static final ConfigOption<Integer> PORT = ConfigOptions
-               .key("jobmanager.rpc.port")
+       public static final ConfigOption<Integer> PORT =
+               key("jobmanager.rpc.port")
                .defaultValue(6123);
 
        /**
         * The maximum number of prior execution attempts kept in history.
         */
-       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = 
ConfigOptions
-                       .key("jobmanager.execution.attempts-history-size")
+       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+               key("jobmanager.execution.attempts-history-size")
                        .defaultValue(16)
                        
.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
@@ -70,88 +72,95 @@ public class JobManagerOptions {
        /**
         * The port for the runtime monitor web-frontend server.
         */
-       public static final ConfigOption<Integer> WEB_PORT = ConfigOptions
-               .key("jobmanager.web.port")
+       public static final ConfigOption<Integer> WEB_PORT =
+               key("jobmanager.web.port")
                .defaultValue(8081);
 
        /**
         * Config parameter to override SSL support for the JobManager Web UI
         */
-       public static final ConfigOption<Boolean> WEB_SSL_ENABLED = 
ConfigOptions
-               .key("jobmanager.web.ssl.enabled")
+       public static final ConfigOption<Boolean> WEB_SSL_ENABLED =
+               key("jobmanager.web.ssl.enabled")
                .defaultValue(true);
 
        /**
         * The config parameter defining the flink web directory to be used by 
the webmonitor.
         */
-       public static final ConfigOption<String> WEB_TMP_DIR = ConfigOptions
-               .key("jobmanager.web.tmpdir")
+       public static final ConfigOption<String> WEB_TMP_DIR =
+               key("jobmanager.web.tmpdir")
                .defaultValue(System.getProperty("java.io.tmpdir"));
 
        /**
         * The config parameter defining the directory for uploading the job 
jars. If not specified a dynamic directory
         * will be used under the directory specified by 
JOB_MANAGER_WEB_TMPDIR_KEY.
         */
-       public static final ConfigOption<String> WEB_UPLOAD_DIR = ConfigOptions
-               .key("jobmanager.web.upload.dir")
+       public static final ConfigOption<String> WEB_UPLOAD_DIR =
+               key("jobmanager.web.upload.dir")
                .noDefaultValue();
 
        /**
         * The config parameter defining the number of archived jobs for the 
jobmanager.
         */
-       public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = 
ConfigOptions
-               .key("jobmanager.web.history")
+       public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT =
+               key("jobmanager.web.history")
                .defaultValue(5);
 
        /**
         * The log file location (may be in /log for standalone but under log 
directory when using YARN).
         */
-       public static final ConfigOption<String> WEB_LOG_PATH = ConfigOptions
-               .key("jobmanager.web.log.path")
+       public static final ConfigOption<String> WEB_LOG_PATH =
+               key("jobmanager.web.log.path")
                .noDefaultValue();
 
        /**
         * Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend.
         */
-       public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = 
ConfigOptions
-               .key("jobmanager.web.submit.enable")
+       public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE =
+               key("jobmanager.web.submit.enable")
                .defaultValue(true);
 
        /**
         * Config parameter defining the number of checkpoints to remember for 
recent history.
         */
-       public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE 
= ConfigOptions
-               .key("jobmanager.web.checkpoints.history")
+       public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE =
+               key("jobmanager.web.checkpoints.history")
                .defaultValue(10);
 
        /**
         * Time after which cached stats are cleaned up if not accessed.
         */
-       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions
-               .key("jobmanager.web.backpressure.cleanup-interval")
+       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_CLEANUP_INTERVAL =
+               key("jobmanager.web.backpressure.cleanup-interval")
                .defaultValue(10 * 60 * 1000);
 
        /**
         * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
         */
-       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions
-               .key("jobmanager.web.backpressure.refresh-interval")
+       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_REFRESH_INTERVAL =
+               key("jobmanager.web.backpressure.refresh-interval")
                .defaultValue(60 * 1000);
 
        /**
         * Number of stack trace samples to take to determine back pressure.
         */
-       public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES 
= ConfigOptions
-               .key("jobmanager.web.backpressure.num-samples")
+       public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES =
+               key("jobmanager.web.backpressure.num-samples")
                .defaultValue(100);
 
        /**
         * Delay between stack trace samples to determine back pressure.
         */
-       public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = 
ConfigOptions
-               .key("jobmanager.web.backpressure.delay-between-samples")
+       public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY =
+               key("jobmanager.web.backpressure.delay-between-samples")
                .defaultValue(50);
 
+       /**
+        * The location where the JobManager stores the archives of completed 
jobs.
+        */
+       public static final ConfigOption<String> ARCHIVE_DIR =
+               key("jobmanager.archive.fs.dir")
+                       .noDefaultValue();
+
        // 
---------------------------------------------------------------------------------------------
 
        private JobManagerOptions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh 
b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index 2230e92..b25358d 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -18,7 +18,7 @@
 
################################################################################
 
 # Start/stop a Flink daemon.
-USAGE="Usage: flink-daemon.sh (start|stop|stop-all) 
(jobmanager|taskmanager|zookeeper) [args]"
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) 
(jobmanager|taskmanager|zookeeper|historyserver) [args]"
 
 STARTSTOP=$1
 DAEMON=$2
@@ -42,6 +42,10 @@ case $DAEMON in
         
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
     ;;
 
+    (historyserver)
+        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+    ;;
+
     (*)
         echo "Unknown daemon '${DAEMON}'. $USAGE."
         exit 1

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/flink-bin/bin/historyserver.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/historyserver.sh 
b/flink-dist/src/main/flink-bin/bin/historyserver.sh
new file mode 100644
index 0000000..adc9660
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/historyserver.sh
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink HistoryServer
+USAGE="Usage: historyserver.sh (start|stop)"
+
+STARTSTOP=$1
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+if [[ $STARTSTOP == "start" ]]; then
+       args=("--configDir" "${FLINK_CONF_DIR}")
+fi
+
+"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP historyserver "${args[@]}"

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index 72acbeb..446d992 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -81,6 +81,27 @@ jobmanager.web.port: 8081
 
 #jobmanager.web.submit.enable: false
 
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
 
 #==============================================================================
 # Streaming state checkpointing

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 39bca71..c1e6229 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -121,8 +121,6 @@ public class WebRuntimeMonitor implements WebMonitor {
        /** LeaderRetrievalListener which stores the currently leading 
JobManager and its archive */
        private final JobManagerRetriever retriever;
 
-       private final Router router;
-
        private final SSLContext serverSSLContext;
 
        private final Promise<String> jobManagerAddressPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
@@ -243,7 +241,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
                RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
 
-               router = new Router();
+               Router router = new Router();
                // config how to interact with this web server
                GET(router, new 
DashboardConfigHandler(cfg.getRefreshInterval()));
 
@@ -381,7 +379,7 @@ public class WebRuntimeMonitor implements WebMonitor {
         * 
         * @return array of all JsonArchivists relevant for the history server
         */
-       public static JsonArchivist[] getArchivers() {
+       public static JsonArchivist[] getJsonArchivists() {
                JsonArchivist[] archivists = new JsonArchivist[]{
                        new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 5a8c76f..b7874c9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -106,7 +106,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
        private static final TimeZone GMT_TIMEZONE = 
TimeZone.getTimeZone("GMT");
 
        /** Date format for HTTP */
-       private static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy 
HH:mm:ss zzz";
+       public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy 
HH:mm:ss zzz";
 
        /** Be default, we allow files to be cached for 5 minutes */
        private static final int HTTP_CACHE_SECONDS = 300;
@@ -348,7 +348,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
         * @param ctx    The channel context to write the response to.
         * @param status The response status.
         */
-       private static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
+       public static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
                FullHttpResponse response = new DefaultFullHttpResponse(
                                HTTP_1_1, status, 
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
                response.headers().set(CONTENT_TYPE, "text/plain; 
charset=UTF-8");
@@ -363,7 +363,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
         *
         * @param ctx The channel context to write the response to.
         */
-       private static void sendNotModified(ChannelHandlerContext ctx) {
+       public static void sendNotModified(ChannelHandlerContext ctx) {
                FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
                setDateHeader(response);
 
@@ -376,7 +376,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
         *
         * @param response HTTP response
         */
-       private static void setDateHeader(FullHttpResponse response) {
+       public static void setDateHeader(FullHttpResponse response) {
                SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
                dateFormatter.setTimeZone(GMT_TIMEZONE);
 
@@ -390,7 +390,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
         * @param response    The HTTP response object.
         * @param fileToCache File to extract the modification timestamp from.
         */
-       private static void setDateAndCacheHeaders(HttpResponse response, File 
fileToCache) {
+       public static void setDateAndCacheHeaders(HttpResponse response, File 
fileToCache) {
                SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
                dateFormatter.setTimeZone(GMT_TIMEZONE);
 
@@ -411,7 +411,7 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
         * @param response HTTP response
         * @param file     file to extract content type
         */
-       private static void setContentTypeHeader(HttpResponse response, File 
file) {
+       public static void setContentTypeHeader(HttpResponse response, File 
file) {
                String mimeType = 
MimeTypes.getMimeTypeForFileName(file.getName());
                String mimeFinal = mimeType != null ? mimeType : 
MimeTypes.getDefaultMimeType();
                response.headers().set(CONTENT_TYPE, mimeFinal);

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
new file mode 100644
index 0000000..f05bd02
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The HistoryServer provides a WebInterface and REST API to retrieve 
information about finished jobs for which
+ * the JobManager may have already shut down.
+ * 
+ * The HistoryServer regularly checks a set of directories for job archives 
created by the {@link FsJobArchivist} and
+ * caches these in a local directory. See {@link HistoryServerArchiveFetcher}.
+ * 
+ * All configuration options are defined in{@link HistoryServerOptions}.
+ * 
+ * The WebInterface only displays the "Completed Jobs" page.
+ * 
+ * The REST API is limited to
+ * <ul>
+ *     <li>/config</li>
+ *     <li>/joboverview</li>
+ *     <li>/jobs/:jobid/*</li>
+ * </ul>
+ * and relies on static files that are served by the {@link 
HistoryServerStaticFileServerHandler}.
+ */
+public class HistoryServer {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServer.class);
+
+       private final Configuration config;
+
+       private final String webAddress;
+       private final int webPort;
+       private final long webRefreshIntervalMillis;
+       private final File webDir;
+
+       private final HistoryServerArchiveFetcher archiveFetcher;
+
+       private final SSLContext serverSSLContext;
+       private WebFrontendBootstrap netty;
+
+       private final Object startupShutdownLock = new Object();
+       private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+       private final Thread shutdownHook;
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+               String configDir = pt.getRequired("configDir");
+
+               LOG.info("Loading configuration from {}", configDir);
+               final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration(configDir);
+
+               // run the history server
+               SecurityUtils.install(new 
SecurityUtils.SecurityConfiguration(flinkConfig));
+
+               try {
+                       SecurityUtils.getInstalledContext().runSecured(new 
Callable<Integer>() {
+                               @Override
+                               public Integer call() throws Exception {
+                                       HistoryServer hs = new 
HistoryServer(flinkConfig);
+                                       hs.run();
+                                       return 0;
+                               }
+                       });
+                       System.exit(0);
+               } catch (UndeclaredThrowableException ute) {
+                       Throwable cause = ute. getUndeclaredThrowable();
+                       LOG.error("Failed to run HistoryServer.", cause);
+                       cause.printStackTrace();
+                       System.exit(1);
+               } catch (Exception e) {
+                       LOG.error("Failed to run HistoryServer.", e);
+                       e.printStackTrace();
+                       System.exit(1);
+               }
+       }
+
+       public HistoryServer(Configuration config) throws IOException, 
FlinkException {
+               this.config = config;
+               if 
(config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
+                       LOG.info("Enabling SSL for the history server.");
+                       try {
+                               this.serverSSLContext = 
SSLUtils.createSSLServerContext(config);
+                       } catch (Exception e) {
+                               throw new IOException("Failed to initialize 
SSLContext for the history server.", e);
+                       }
+               } else {
+                       this.serverSSLContext = null;
+               }
+
+               webAddress = 
config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
+               webPort = 
config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
+               webRefreshIntervalMillis = 
config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);
+
+               String webDirectory = 
config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
+               if (webDirectory == null) {
+                       webDirectory = System.getProperty("java.io.tmpdir") + 
"flink-web-history-" + UUID.randomUUID();
+               }
+               webDir = new File(webDirectory);
+
+               String refreshDirectories = 
config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
+               if (refreshDirectories == null) {
+                       throw new 
FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not 
configured.");
+               }
+               List<RefreshLocation> refreshDirs = new ArrayList<>();
+               for (String refreshDirectory : refreshDirectories.split(",")) {
+                       try {
+                               Path refreshPath = 
WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
+                               FileSystem refreshFS = 
refreshPath.getFileSystem();
+                               refreshDirs.add(new 
RefreshLocation(refreshPath, refreshFS));
+                       } catch (Exception e) {
+                               // there's most likely something wrong with the 
path itself, so we ignore it from here on
+                               LOG.warn("Failed to create Path or FileSystem 
for directory '{}'. Directory will not be monitored.", refreshDirectory, e);
+                       }
+               }
+
+               if (refreshDirs.isEmpty()) {
+                       throw new FlinkException("Failed to validate any of the 
configured directories to monitor.");
+               }
+
+               long refreshIntervalMillis = 
config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
+               archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir);
+
+               this.shutdownHook = new Thread() {
+                       @Override
+                       public void run() {
+                               HistoryServer.this.stop();
+                       }
+               };
+               // add shutdown hook for deleting the directories and remaining 
temp files on shutdown
+               try {
+                       Runtime.getRuntime().addShutdownHook(shutdownHook);
+               } catch (IllegalStateException e) {
+                       // race, JVM is in shutdown already, we can safely 
ignore this
+                       LOG.debug("Unable to add shutdown hook, shutdown 
already in progress", e);
+               } catch (Throwable t) {
+                       // these errors usually happen when the shutdown is 
already in progress
+                       LOG.warn("Error while adding shutdown hook", t);
+               }
+       }
+
+       public void run() {
+               try {
+                       start();
+                       new CountDownLatch(1).await();
+               } catch (Exception e) {
+                       LOG.error("Failure while running HistoryServer.", e);
+               } finally {
+                       stop();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Life-cycle
+       // 
------------------------------------------------------------------------
+
+       void start() throws IOException, InterruptedException {
+               synchronized (startupShutdownLock) {
+                       LOG.info("Starting history server.");
+
+                       Files.createDirectories(webDir.toPath());
+                       LOG.info("Using directory {} as local cache.", webDir);
+
+                       Router router = new Router();
+                       router.GET("/:*", new 
HistoryServerStaticFileServerHandler(webDir));
+
+                       if (!webDir.exists() && !webDir.mkdirs()) {
+                               throw new IOException("Failed to create local 
directory " + webDir.getAbsoluteFile() + ".");
+                       }
+
+                       createDashboardConfigFile();
+
+                       archiveFetcher.start();
+
+                       netty = new WebFrontendBootstrap(router, LOG, webDir, 
serverSSLContext, webAddress, webPort, config);
+               }
+       }
+
+       void stop() {
+               if (shutdownRequested.compareAndSet(false, true)) {
+                       synchronized (startupShutdownLock) {
+                               LOG.info("Stopping history server.");
+
+                               try {
+                                       netty.shutdown();
+                               } catch (Throwable t) {
+                                       LOG.warn("Error while shutting down 
WebFrontendBootstrap.", t);
+                               }
+
+                               archiveFetcher.stop();
+
+                               try {
+                                       LOG.info("Removing web dashboard root 
cache directory {}", webDir);
+                                       FileUtils.deleteDirectory(webDir);
+                               } catch (Throwable t) {
+                                       LOG.warn("Error while deleting web root 
directory {}", webDir, t);
+                               }
+
+                               LOG.info("Stopped history server.");
+
+                               // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the shutdown hook itself
+                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                                       try {
+                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                                       } catch (IllegalStateException ignored) 
{
+                                               // race, JVM is in shutdown 
already, we can safely ignore this
+                                       } catch (Throwable t) {
+                                               LOG.warn("Exception while 
unregistering HistoryServer cleanup shutdown hook.");
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // File generation
+       // 
------------------------------------------------------------------------
+
+       static FileWriter createOrGetFile(File folder, String name) throws 
IOException {
+               File file = new File(folder, name + ".json");
+               if (!file.exists()) {
+                       Files.createFile(file.toPath());
+               }
+               FileWriter fr = new FileWriter(file);
+               return fr;
+       }
+
+       private void createDashboardConfigFile() throws IOException {
+               try (FileWriter fw = createOrGetFile(webDir, "config")) {
+                       
fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis));
+                       fw.flush();
+               } catch (IOException ioe) {
+                       LOG.error("Failed to write config file.");
+                       throw ioe;
+               }
+       }
+
+       /**
+        * Container for the {@link Path} and {@link FileSystem} of a refresh 
directory.
+        */
+       static class RefreshLocation {
+               private final Path path;
+               private final FileSystem fs;
+
+               private RefreshLocation(Path path, FileSystem fs) {
+                       this.path = path;
+                       this.fs = fs;
+               }
+
+               public Path getPath() {
+                       return path;
+               }
+
+               public FileSystem getFs() {
+                       return fs;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
new file mode 100644
index 0000000..824d6c9
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the job archives 
that are located at
+ * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories 
are polled in regular intervals, defined
+ * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ * 
+ * The archives are downloaded and expanded into a file structure analog to 
the REST API defined in the WebRuntimeMonitor.
+ */
+class HistoryServerArchiveFetcher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+       private static final JsonFactory jacksonFactory = new JsonFactory();
+       private static final ObjectMapper mapper = new ObjectMapper();
+
+       private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+               new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+       private final JobArchiveFetcherTask fetcherTask;
+       private final long refreshIntervalMillis;
+
+       HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List<HistoryServer.RefreshLocation> refreshDirs, File webDir) {
+               this.refreshIntervalMillis = refreshIntervalMillis;
+               this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+               if (LOG.isInfoEnabled()) {
+                       for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+                               LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+                       }
+               }
+       }
+
+       void start() {
+               executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+       }
+
+       void stop() {
+               executor.shutdown();
+
+               try {
+                       if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+                               executor.shutdownNow();
+                       }
+               } catch (InterruptedException ignored) {
+                       executor.shutdownNow();
+               }
+       }
+
+       /**
+        * {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
+        * new job archives.
+        */
+       static class JobArchiveFetcherTask extends TimerTask {
+
+               private final List<HistoryServer.RefreshLocation> refreshDirs;
+
+               /** Cache of all available jobs identified by their id. */
+               private final Set<String> cachedArchives;
+
+               private final File webDir;
+               private final File webJobDir;
+               private final File webOverviewDir;
+
+               private static final String JSON_FILE_ENDING = ".json";
+
+               JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> 
refreshDirs, File webDir) {
+                       this.refreshDirs = checkNotNull(refreshDirs);
+                       this.cachedArchives = new HashSet<>();
+                       this.webDir = checkNotNull(webDir);
+                       this.webJobDir = new File(webDir, "jobs");
+                       webJobDir.mkdir();
+                       this.webOverviewDir = new File(webDir, "overviews");
+                       webOverviewDir.mkdir();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               for (HistoryServer.RefreshLocation 
refreshLocation : refreshDirs) {
+                                       Path refreshDir = 
refreshLocation.getPath();
+                                       FileSystem refreshFS = 
refreshLocation.getFs();
+
+                                       // contents of /:refreshDir
+                                       FileStatus[] jobArchives;
+                                       try {
+                                               jobArchives = 
refreshFS.listStatus(refreshDir);
+                                       } catch (IOException e) {
+                                               LOG.error("Failed to access job 
archive location for path {}.", refreshDir, e);
+                                               continue;
+                                       }
+                                       if (jobArchives == null) {
+                                               continue;
+                                       }
+                                       boolean updateOverview = false;
+                                       for (FileStatus jobArchive : 
jobArchives) {
+                                               Path jobArchivePath = 
jobArchive.getPath();
+                                               String jobID = 
jobArchivePath.getName();
+                                               try {
+                                                       
JobID.fromHexString(jobID);
+                                               } catch 
(IllegalArgumentException iae) {
+                                                       LOG.debug("Archive 
directory {} contained file with unexpected name {}. Ignoring file.",
+                                                               refreshDir, 
jobID, iae);
+                                                       continue;
+                                               }
+                                               if (cachedArchives.add(jobID)) {
+                                                       try {
+                                                               for 
(ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
+                                                                       String 
path = archive.getPath();
+                                                                       String 
json = archive.getJson();
+
+                                                                       File 
target;
+                                                                       if 
(path.equals("/joboverview")) {
+                                                                               
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+                                                                       } else {
+                                                                               
target = new File(webDir, path + JSON_FILE_ENDING);
+                                                                       }
+
+                                                                       
java.nio.file.Path parent = target.getParentFile().toPath();
+
+                                                                       try {
+                                                                               
Files.createDirectories(parent);
+                                                                       } catch 
(FileAlreadyExistsException ignored) {
+                                                                               
// there may be left-over directories from the previous attempt
+                                                                       }
+
+                                                                       
java.nio.file.Path targetPath = target.toPath();
+                                                                       
+                                                                       // We 
overwrite existing files since this may be another attempt at fetching this 
archive.
+                                                                       // 
Existing files may be incomplete/corrupt.
+                                                                       if 
(Files.exists(targetPath)) {
+                                                                               
Files.delete(targetPath);
+                                                                       }
+
+                                                                       
Files.createFile(target.toPath());
+                                                                       try 
(FileWriter fw = new FileWriter(target)) {
+                                                                               
fw.write(json);
+                                                                               
fw.flush();
+                                                                       }
+                                                               }
+                                                               updateOverview 
= true;
+                                                       } catch (IOException e) 
{
+                                                               
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, 
e);
+                                                               // Make sure we 
attempt to fetch the archive again
+                                                               
cachedArchives.remove(jobID);
+                                                               // Make sure we 
do not include this job in the overview
+                                                               try {
+                                                                       
Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
+                                                               } catch 
(IOException ioe) {
+                                                                       
LOG.debug("Could not delete file from overview directory.", ioe);
+                                                               }
+
+                                                               // Clean up job 
files we may have created
+                                                               File 
jobDirectory = new File(webJobDir, jobID);
+                                                               try {
+                                                                       
FileUtils.deleteDirectory(jobDirectory);
+                                                               } catch 
(IOException ioe) {
+                                                                       
LOG.debug("Could not clean up job directory.", ioe);
+                                                               }
+                                                       }
+                                               }
+                                       }
+                                       if (updateOverview) {
+                                               updateJobOverview(webDir);
+                                       }
+                               }
+                       } catch (Exception e) {
+                               LOG.error("Critical failure while 
fetching/processing job archives.", e);
+                       }
+               }
+       }
+
+       /**
+        * This method replicates the JSON response that would be given by the 
{@link CurrentJobsOverviewHandler} when
+        * listing both running and finished jobs.
+        *
+        * Every job archive contains a joboverview.json file containing the 
same structure. Since jobs are archived on
+        * their own however the list of finished jobs only contains a single 
job.
+        *
+        * For the display in the HistoryServer WebFrontend we have to combine 
these overviews.
+        */
+       private static void updateJobOverview(File webDir) {
+               File webOverviewDir = new File(webDir, "overviews");
+               try (JsonGenerator gen = 
jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, 
"joboverview"))) {
+                       gen.writeStartObject();
+                       gen.writeArrayFieldStart("running");
+                       gen.writeEndArray();
+                       gen.writeArrayFieldStart("finished");
+
+                       File[] overviews = new 
File(webOverviewDir.getPath()).listFiles();
+                       if (overviews != null) {
+                               for (File overview : overviews) {
+                                       JsonNode root = 
mapper.readTree(overview);
+                                       JsonNode finished = 
root.get("finished");
+                                       JsonNode job = finished.get(0);
+                                       mapper.writeTree(gen, job);
+                               }
+                       }
+
+                       gen.writeEndArray();
+                       gen.writeEndObject();
+               } catch (IOException ioe) {
+                       LOG.error("Failed to update job overview.", ioe);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
new file mode 100644
index 0000000..31e9bbc
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple file server handler used by the {@link HistoryServer} that serves 
requests to web frontend's static files,
+ * such as HTML, CSS, JS or JSON files.
+ *
+ * This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.
+ * 
+ * This class is a copy of the {@link StaticFileServerHandler}. The 
differences are that the request path is
+ * modified to end on ".json" if it does not have a filename extension; when 
"index.html" is requested we load
+ * "index_hs.html" instead to inject the modified HistoryServer WebInterface 
and that the caching of the "/joboverview"
+ * page is prevented.
+ */
[email protected]
+public class HistoryServerStaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed> {
+
+       /** Default logger, if none is specified */
+       private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The path in which the static documents are */
+       private final File rootPath;
+
+       public HistoryServerStaticFileServerHandler(File rootPath) throws 
IOException {
+               this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Responses to requests
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
+               String requestPath = routed.path();
+
+               respondWithFile(ctx, routed.request(), requestPath);
+       }
+
+       /**
+        * Response when running with leading JobManager.
+        */
+       private void respondWithFile(ChannelHandlerContext ctx, HttpRequest 
request, String requestPath)
+               throws IOException, ParseException {
+
+               // make sure we request the "index.html" in case there is a 
directory request
+               if (requestPath.endsWith("/")) {
+                       requestPath = requestPath + "index.html";
+               }
+
+               if (!requestPath.contains(".")) { // we assume that the path 
ends in either .html or .js
+                       requestPath = requestPath + ".json";
+               }
+
+               // convert to absolute path
+               final File file = new File(rootPath, requestPath);
+
+               if (!file.exists()) {
+                       // file does not exist. Try to load it with the 
classloader
+                       ClassLoader cl = 
HistoryServerStaticFileServerHandler.class.getClassLoader();
+
+                       String pathToLoad = requestPath.replace("index.html", 
"index_hs.html");
+
+                       try (InputStream resourceStream = 
cl.getResourceAsStream("web" + pathToLoad)) {
+                               boolean success = false;
+                               try {
+                                       if (resourceStream != null) {
+                                               URL root = 
cl.getResource("web");
+                                               URL requested = 
cl.getResource("web" + pathToLoad);
+
+                                               if (root != null && requested 
!= null) {
+                                                       URI rootURI = new 
URI(root.getPath()).normalize();
+                                                       URI requestedURI = new 
URI(requested.getPath()).normalize();
+
+                                                       // Check that we don't 
load anything from outside of the
+                                                       // expected scope.
+                                                       if 
(!rootURI.relativize(requestedURI).equals(requestedURI)) {
+                                                               
LOG.debug("Loading missing file from classloader: {}", pathToLoad);
+                                                               // ensure that 
directory to file exists.
+                                                               
file.getParentFile().mkdirs();
+                                                               
Files.copy(resourceStream, file.toPath());
+
+                                                               success = true;
+                                                       }
+                                               }
+                                       }
+                               } catch (Throwable t) {
+                                       LOG.error("error while responding", t);
+                               } finally {
+                                       if (!success) {
+                                               LOG.debug("Unable to load 
requested file {} from classloader", pathToLoad);
+                                               
StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                                               return;
+                                       }
+                               }
+                       }
+               }
+
+               if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
+                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       return;
+               }
+
+               if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
+                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       return;
+               }
+
+               // cache validation
+               final String ifModifiedSince = 
request.headers().get(IF_MODIFIED_SINCE);
+               if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
+                       SimpleDateFormat dateFormatter = new 
SimpleDateFormat(StaticFileServerHandler.HTTP_DATE_FORMAT, Locale.US);
+                       Date ifModifiedSinceDate = 
dateFormatter.parse(ifModifiedSince);
+
+                       // Only compare up to the second because the datetime 
format we send to the client
+                       // does not have milliseconds
+                       long ifModifiedSinceDateSeconds = 
ifModifiedSinceDate.getTime() / 1000;
+                       long fileLastModifiedSeconds = file.lastModified() / 
1000;
+                       if (ifModifiedSinceDateSeconds == 
fileLastModifiedSeconds) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Responding 'NOT MODIFIED' 
for file '" + file.getAbsolutePath() + '\'');
+                               }
+
+                               StaticFileServerHandler.sendNotModified(ctx);
+                               return;
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Responding with file '" + 
file.getAbsolutePath() + '\'');
+               }
+
+               // Don't need to close this manually. Netty's DefaultFileRegion 
will take care of it.
+               final RandomAccessFile raf;
+               try {
+                       raf = new RandomAccessFile(file, "r");
+               } catch (FileNotFoundException e) {
+                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       return;
+               }
+               long fileLength = raf.length();
+
+               HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+               StaticFileServerHandler.setContentTypeHeader(response, file);
+
+               // the job overview should be updated as soon as possible
+               if (!requestPath.equals("/joboverview.json")) {
+                       
StaticFileServerHandler.setDateAndCacheHeaders(response, file);
+               }
+               if (HttpHeaders.isKeepAlive(request)) {
+                       response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
+               }
+               HttpHeaders.setContentLength(response, fileLength);
+
+               // write the initial line and the header.
+               ctx.write(response);
+
+               // write the content.
+               ChannelFuture lastContentFuture;
+               if (ctx.pipeline().get(SslHandler.class) == null) {
+                       ctx.write(new DefaultFileRegion(raf.getChannel(), 0, 
fileLength), ctx.newProgressivePromise());
+                       lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+               } else {
+                       lastContentFuture = ctx.writeAndFlush(new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+                               ctx.newProgressivePromise());
+                       // HttpChunkedInput will write the end marker 
(LastHttpContent) for us.
+               }
+
+               // close the connection, if no keep-alive is needed
+               if (!HttpHeaders.isKeepAlive(request)) {
+                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
+               if (ctx.channel().isActive()) {
+                       LOG.error("Caught exception", cause);
+                       StaticFileServerHandler.sendError(ctx, 
INTERNAL_SERVER_ERROR);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
new file mode 100644
index 0000000..1c51b43
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WebMonitorUtilsTest {
+
+       @Test
+       public void testGetArchivers() {
+               JsonArchivist[] direct = WebRuntimeMonitor.getJsonArchivists();
+               JsonArchivist[] reflected = WebMonitorUtils.getJsonArchivists();
+
+               Assert.assertEquals(direct.length, reflected.length);
+               for(int x = 0; x < direct.length; x++) {
+                       Assert.assertSame(direct[x].getClass(), 
reflected[x].getClass());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
new file mode 100644
index 0000000..f23c249
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+/**
+ * This test must reside in flink-runtime-web since the {@link FsJobArchivist} 
relies on
+ * {@link WebRuntimeMonitor#getJsonArchivists()}.
+ */
+public class FsJobArchivistTest {
+
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+       @Test
+       public void testArchiveJob() throws Exception {
+               Path tmpPath = new Path(tmpFolder.getRoot().getAbsolutePath());
+
+               AccessExecutionGraph graph = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> expected = new ArrayList<>();
+               for (JsonArchivist archivist : 
WebRuntimeMonitor.getJsonArchivists()) {
+                       for (ArchivedJson archive : 
archivist.archiveJsonWithPath(graph)) {
+                               expected.add(archive);
+                       }
+               }
+
+               Path archivePath = FsJobArchivist.archiveJob(tmpPath, graph);
+               Collection<ArchivedJson> actual = 
FsJobArchivist.getArchivedJsons(archivePath);
+
+               Assert.assertEquals(expected.size(), actual.size());
+
+               Iterator<ArchivedJson> eI = expected.iterator();
+               Iterator<ArchivedJson> aI = actual.iterator();
+               
+               // several jsons contain a dynamic "now" field that depends on 
the time of creation, so we can't easily compare
+               // the json and only check the path
+               // /jobs/:jobid
+               // /jobs/:jobid/vertices
+               // /jobs/:jobid/vertices/:vertexid
+               // /jobs/:jobid/vertices/:vertexid/subtasktimes
+               // /jobs/:jobid/vertices/:vertexid/taskmanagers
+               Pattern jobDetailsPattern = 
Pattern.compile("/jobs/[a-fA-F0-9]{32}(/vertices(/[a-fA-F0-9]{32}(/(subtasktimes|taskmanagers))?)?)?");
+               while (eI.hasNext() && aI.hasNext()) {
+                       // technically there isn't guarantee that the order is 
identical, but as it stands this is the case
+                       ArchivedJson exp = eI.next();
+                       ArchivedJson act = aI.next();
+                       if (jobDetailsPattern.matcher(exp.getPath()).matches()) 
{
+                               Assert.assertEquals(exp.getPath(), 
act.getPath());
+                       } else {
+                               Assert.assertEquals(exp, act);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
new file mode 100644
index 0000000..3eff02a
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class HistoryServerStaticFileServerHandlerTest {
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       @Test
+       public void testRespondWithFile() throws Exception {
+               File webDir = tmp.newFolder("webDir");
+               Router router = new Router()
+                       .GET("/:*", new 
HistoryServerStaticFileServerHandler(webDir));
+               WebFrontendBootstrap webUI = new WebFrontendBootstrap(
+                       router,
+                       
LoggerFactory.getLogger(HistoryServerStaticFileServerHandlerTest.class),
+                       tmp.newFolder("uploadDir"),
+                       null,
+                       "localhost",
+                       8081,
+                       new Configuration());
+               try {
+                       // verify that 404 message is returned when requesting 
a non-existant file
+                       String notFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:8081/hello";);
+                       Assert.assertTrue(notFound404.contains("404 Not 
Found"));
+
+                       // verify that a) a file can be loaded using the 
ClassLoader and b) that the HistoryServer
+                       // index_hs.html is injected
+                       String index = 
HistoryServerTest.getFromHTTP("http://localhost:8081/index.html";);
+                       Assert.assertTrue(index.contains("Completed Jobs"));
+
+                       // verify that index.html is appended if the request 
path ends on '/'
+                       String index2 = 
HistoryServerTest.getFromHTTP("http://localhost:8081/";);
+                       Assert.assertEquals(index, index2);
+
+                       // verify that a 404 message is returned when 
requesting a directory
+                       File dir = new File(webDir, "dir.json");
+                       dir.mkdirs();
+                       String dirNotFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:8081/dir";);
+                       Assert.assertTrue(dirNotFound404.contains("404 Not 
Found"));
+
+                       // verify that a 404 message is returned when 
requesting a file outside the webDir
+                       tmp.newFile("secret");
+                       String x = 
HistoryServerTest.getFromHTTP("http://localhost:8081/../secret";);
+                       Assert.assertTrue(x.contains("404 Not Found"));
+               } finally {
+                       webUI.shutdown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
new file mode 100644
index 0000000..78fbe0b
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import scala.Option;
+
+public class HistoryServerTest {
+
+       @Rule
+       public TemporaryFolder tmpDir = new TemporaryFolder();
+
+       @Test
+       public void testFullArchiveLifecycle() throws Exception {
+               ArchivedExecutionGraph graph = (ArchivedExecutionGraph) 
ArchivedJobGenerationUtils.getTestJob();
+
+               File jmDirectory = tmpDir.newFolder("jm");
+               File hsDirectory = tmpDir.newFolder("hs");
+               
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ARCHIVE_DIR, 
jmDirectory.toURI().toString());
+
+               
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, 
jmDirectory.toURI().toString());
+               config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, 
hsDirectory.getAbsolutePath());
+
+               ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(config);
+               Option<Path> archivePath = Option.apply(new 
Path(jmDirectory.toURI().toString()));
+
+               ActorRef memoryArchivist = 
actorSystem.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 1, 
archivePath));
+               memoryArchivist.tell(new 
ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+
+               File archive = new File(jmDirectory, 
graph.getJobID().toString());
+               for (int x = 0; x < 10 && !archive.exists(); x++) {
+                       Thread.sleep(50);
+               }
+               Assert.assertTrue(archive.exists());
+
+               HistoryServer hs = new HistoryServer(config);
+               try {
+                       hs.start();
+                       ObjectMapper mapper = new ObjectMapper();
+                       JsonNode overview = null;
+                       for (int x = 0; x < 20; x++) {
+                               Thread.sleep(50);
+                               String response = 
getFromHTTP("http://localhost:8082/joboverview";);
+                               if (response.contains("404 Not Found")) {
+                                       // file may not be written yet
+                                       continue;
+                               } else {
+                                       try {
+                                               overview = 
mapper.readTree(response);
+                                               break;
+                                       } catch (IOException ignored) {
+                                               // while the file may exist the 
contents may not have been written yet
+                                               continue;
+                                       }
+                               }
+                       }
+                       Assert.assertNotNull("/joboverview.json did not contain 
valid json", overview);
+                       String jobID = 
overview.get("finished").get(0).get("jid").asText();
+                       JsonNode jobDetails = 
mapper.readTree(getFromHTTP("http://localhost:8082/jobs/"; + jobID));
+                       Assert.assertNotNull(jobDetails.get("jid"));
+               } finally {
+                       hs.stop();
+               }
+       }
+
+       public static String getFromHTTP(String url) throws Exception {
+               URL u = new URL(url);
+               HttpURLConnection connection = (HttpURLConnection) 
u.openConnection();
+               connection.setConnectTimeout(100000);
+               connection.connect();
+               InputStream is;
+               if (connection.getResponseCode() >= 400) {
+                       // error!
+                       is = connection.getErrorStream();
+               } else {
+                       is = connection.getInputStream();
+               }
+
+               return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/app/index_hs.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/index_hs.jade 
b/flink-runtime-web/web-dashboard/app/index_hs.jade
new file mode 100644
index 0000000..9cf7411
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/index_hs.jade
@@ -0,0 +1,60 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+doctype html
+html(lang='en')
+  head
+    meta(charset='utf-8')
+    meta(http-equiv='X-UA-Compatible', content='IE=edge')
+    meta(name='viewport', content='width=device-width, initial-scale=1')
+
+    title Apache Flink History Server
+
+    link(rel="apple-touch-icon", sizes="180x180", 
href="images/apple-touch-icon.png")
+    link(rel="icon", type="image/png", href="images/favicon-32x32.png", 
sizes="32x32")
+    link(rel="icon", type="image/png", href="images/favicon-16x16.png", 
sizes="16x16")
+    link(rel="manifest", href="images/manifest.json")
+    link(rel="mask-icon", href="images/safari-pinned-tab.svg", color="#aa1919")
+    link(rel="shortcut icon", href="images/favicon.ico")
+    meta(name="msapplication-config", content="images/browserconfig.xml")
+    meta(name="theme-color", content="#ffffff")
+
+    link(rel='stylesheet', href='css/vendor.css', type='text/css')
+    link(rel='stylesheet', href='css/index.css', type='text/css')
+
+    script(src="js/vendor.js")
+    script(src="js/hs/index.js")
+
+  body(ng-app="flinkApp" ng-strict-di)
+    #sidebar(ng-class="{ 'sidebar-visible': sidebarVisible }")
+      nav.navbar.navbar-inverse.navbar-static-top
+        .navbar-header
+          a.navbar-brand(ui-sref="completed-jobs")
+            img.logo(alt="Apache Flink Dashboard" src="images/flink-logo.png")
+          a.navbar-brand.navbar-brand-text(ui-sref="completed-jobs")
+            | Apache Flink Dashboard
+
+      .navbar.navbar-sidebar
+        ul.nav
+          li
+            a(ui-sref="completed-jobs" ui-sref-active='active')
+              i.fa.fa-check-circle.fa-fw
+              | 
+              | Completed Jobs
+
+    #content(ng-class="{ 'sidebar-visible': sidebarVisible }")
+      #main(ui-view='main')

http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee
new file mode 100644
index 0000000..55d6b0c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
+
+# --------------------------------------
+
+.run ($rootScope) ->
+  $rootScope.sidebarVisible = false
+  $rootScope.showSidebar = ->
+    $rootScope.sidebarVisible = !$rootScope.sidebarVisible
+    $rootScope.sidebarClass = 'force-show'
+
+# --------------------------------------
+
+.value 'flinkConfig', {
+  jobServer: ''
+#  jobServer: 'http://localhost:8081/'
+  "refresh-interval": 10000
+}
+
+# --------------------------------------
+
+.value 'watermarksConfig', {
+  # A value of (Java) Long.MIN_VALUE indicates that there is no watermark
+  # available. This is parsed by Javascript as this number. We have it as
+  # a constant here to compare available watermarks against.
+  noWatermark: -9223372036854776000
+}
+
+# --------------------------------------
+
+.run (JobsService, MainService, flinkConfig, $interval) ->
+  MainService.loadConfig().then (config) ->
+    angular.extend flinkConfig, config
+
+    JobsService.listJobs()
+
+    $interval ->
+      JobsService.listJobs()
+    , flinkConfig["refresh-interval"]
+
+
+# --------------------------------------
+
+.config ($uiViewScrollProvider) ->
+  $uiViewScrollProvider.useAnchorScroll()
+
+# --------------------------------------
+
+.run ($rootScope, $state) ->
+  $rootScope.$on '$stateChangeStart', (event, toState, toParams, fromState) ->
+    if toState.redirectTo
+      event.preventDefault()
+      $state.go toState.redirectTo, toParams
+
+# --------------------------------------
+
+.config ($stateProvider, $urlRouterProvider) ->
+  $stateProvider.state "completed-jobs",
+    url: "/completed-jobs"
+    views:
+      main:
+        templateUrl: "partials/jobs/completed-jobs.html"
+        controller: 'CompletedJobsController'
+
+  .state "single-job",
+    url: "/jobs/{jobid}"
+    abstract: true
+    views:
+      main:
+        templateUrl: "partials/jobs/job.html"
+        controller: 'SingleJobController'
+
+  .state "single-job.plan",
+    url: ""
+    redirectTo: "single-job.plan.subtasks"
+    views:
+      details:
+        templateUrl: "partials/jobs/job.plan.html"
+        controller: 'JobPlanController'
+
+  .state "single-job.plan.subtasks",
+    url: ""
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.subtasks.html"
+        controller: 'JobPlanSubtasksController'
+
+  .state "single-job.plan.metrics",
+    url: "/metrics"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.metrics.html"
+        controller: 'JobPlanMetricsController'
+
+  .state "single-job.plan.watermarks",
+    url: "/watermarks"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.watermarks.html"
+
+  .state "single-job.plan.taskmanagers",
+    url: "/taskmanagers"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.taskmanagers.html"
+        controller: 'JobPlanTaskManagersController'
+
+  .state "single-job.plan.accumulators",
+    url: "/accumulators"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.accumulators.html"
+        controller: 'JobPlanAccumulatorsController'
+
+  .state "single-job.plan.checkpoints",
+      url: "/checkpoints"
+      redirectTo: "single-job.plan.checkpoints.overview"
+      views:
+        'node-details':
+          templateUrl: "partials/jobs/job.plan.node-list.checkpoints.html"
+          controller: 'JobPlanCheckpointsController'
+  
+    .state "single-job.plan.checkpoints.overview",
+      url: "/overview"
+      views:
+        'checkpoints-view':
+          templateUrl: "partials/jobs/job.plan.node.checkpoints.overview.html"
+          controller: 'JobPlanCheckpointsController'
+  
+    .state "single-job.plan.checkpoints.summary",
+      url: "/summary"
+      views:
+        'checkpoints-view':
+          templateUrl: "partials/jobs/job.plan.node.checkpoints.summary.html"
+          controller: 'JobPlanCheckpointsController'
+  
+    .state "single-job.plan.checkpoints.history",
+      url: "/history"
+      views:
+        'checkpoints-view':
+          templateUrl: "partials/jobs/job.plan.node.checkpoints.history.html"
+          controller: 'JobPlanCheckpointsController'
+  
+    .state "single-job.plan.checkpoints.config",
+      url: "/config"
+      views:
+        'checkpoints-view':
+          templateUrl: "partials/jobs/job.plan.node.checkpoints.config.html"
+          controller: 'JobPlanCheckpointsController'
+  
+    .state "single-job.plan.checkpoints.details",
+      url: "/details/{checkpointId}"
+      views:
+        'checkpoints-view':
+          templateUrl: "partials/jobs/job.plan.node.checkpoints.details.html"
+          controller: 'JobPlanCheckpointDetailsController'
+
+  .state "single-job.plan.backpressure",
+    url: "/backpressure"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.backpressure.html"
+        controller: 'JobPlanBackPressureController'
+
+  .state "single-job.timeline",
+    url: "/timeline"
+    views:
+      details:
+        templateUrl: "partials/jobs/job.timeline.html"
+
+  .state "single-job.timeline.vertex",
+    url: "/{vertexId}"
+    views:
+      vertex:
+        templateUrl: "partials/jobs/job.timeline.vertex.html"
+        controller: 'JobTimelineVertexController'
+
+  .state "single-job.exceptions",
+    url: "/exceptions"
+    views:
+      details:
+        templateUrl: "partials/jobs/job.exceptions.html"
+        controller: 'JobExceptionsController'
+
+  .state "single-job.config",
+    url: "/config"
+    views:
+      details:
+        templateUrl: "partials/jobs/job.config.html"
+
+  $urlRouterProvider.otherwise "/completed-jobs"

Reply via email to