[FLINK-1579] [webui] Implement History Server Archives jobs to a file system directory and adds a history server that can display the archived jobs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cab0cb2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cab0cb2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cab0cb2 Branch: refs/heads/master Commit: 2cab0cb2645c78d95c9586a2b84a2d8768a391c2 Parents: 8e85f01 Author: zentol <[email protected]> Authored: Mon Mar 20 18:41:54 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Mar 22 15:45:00 2017 +0100 ---------------------------------------------------------------------- docs/monitoring/back_pressure.md | 2 +- docs/monitoring/checkpoint_monitoring.md | 2 +- docs/monitoring/debugging_classloading.md | 2 +- docs/monitoring/debugging_event_time.md | 2 +- docs/monitoring/historyserver.md | 98 ++++++ docs/monitoring/large_state_tuning.md | 2 +- docs/setup/config.md | 17 ++ .../configuration/HistoryServerOptions.java | 82 +++++ .../flink/configuration/JobManagerOptions.java | 69 +++-- .../src/main/flink-bin/bin/flink-daemon.sh | 6 +- .../src/main/flink-bin/bin/historyserver.sh | 34 +++ flink-dist/src/main/resources/flink-conf.yaml | 21 ++ .../runtime/webmonitor/WebRuntimeMonitor.java | 6 +- .../files/StaticFileServerHandler.java | 12 +- .../webmonitor/history/HistoryServer.java | 302 +++++++++++++++++++ .../history/HistoryServerArchiveFetcher.java | 252 ++++++++++++++++ .../HistoryServerStaticFileServerHandler.java | 249 +++++++++++++++ .../runtime/webmonitor/WebMonitorUtilsTest.java | 36 +++ .../webmonitor/history/FsJobArchivistTest.java | 84 ++++++ ...istoryServerStaticFileServerHandlerTest.java | 77 +++++ .../webmonitor/history/HistoryServerTest.java | 122 ++++++++ .../web-dashboard/app/index_hs.jade | 60 ++++ .../web-dashboard/app/scripts/index_hs.coffee | 208 +++++++++++++ flink-runtime-web/web-dashboard/gulpfile.js | 29 +- .../flink/runtime/history/FsJobArchivist.java | 122 ++++++++ .../runtime/webmonitor/WebMonitorUtils.java | 48 +++ .../webmonitor/history/ArchivedJson.java | 16 + .../flink/runtime/jobmanager/JobManager.scala | 30 +- .../runtime/jobmanager/MemoryArchivist.scala | 37 ++- .../minicluster/LocalFlinkMiniCluster.scala | 14 +- .../jobmanager/JobManagerHARecoveryTest.java | 3 +- .../webmonitor/history/ArchivedJsonTest.java | 37 +++ .../testingUtils/TestingMemoryArchivist.scala | 4 +- 33 files changed, 2024 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/back_pressure.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/back_pressure.md b/docs/monitoring/back_pressure.md index 4a4bd22..f047066 100644 --- a/docs/monitoring/back_pressure.md +++ b/docs/monitoring/back_pressure.md @@ -1,7 +1,7 @@ --- title: "Monitoring Back Pressure" nav-parent_id: monitoring -nav-pos: 4 +nav-pos: 5 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/checkpoint_monitoring.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/checkpoint_monitoring.md b/docs/monitoring/checkpoint_monitoring.md index 5a4dba7..7cf06d1 100644 --- a/docs/monitoring/checkpoint_monitoring.md +++ b/docs/monitoring/checkpoint_monitoring.md @@ -1,7 +1,7 @@ --- title: "Monitoring Checkpointing" nav-parent_id: monitoring -nav-pos: 3 +nav-pos: 4 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/debugging_classloading.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md index 85ee9bb..8c91e0f 100644 --- a/docs/monitoring/debugging_classloading.md +++ b/docs/monitoring/debugging_classloading.md @@ -1,7 +1,7 @@ --- title: "Debugging Classloading" nav-parent_id: monitoring -nav-pos: 13 +nav-pos: 14 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/debugging_event_time.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/debugging_event_time.md b/docs/monitoring/debugging_event_time.md index eaf0d5c..edc7dd0 100644 --- a/docs/monitoring/debugging_event_time.md +++ b/docs/monitoring/debugging_event_time.md @@ -1,7 +1,7 @@ --- title: "Debugging Windows & Event Time" nav-parent_id: monitoring -nav-pos: 12 +nav-pos: 13 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/historyserver.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md new file mode 100644 index 0000000..fd957f2 --- /dev/null +++ b/docs/monitoring/historyserver.md @@ -0,0 +1,98 @@ +--- +title: "History Server" +nav-parent_id: monitoring +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. + +Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. + +* This will be replaced by the TOC +{:toc} + +## Overview + +The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. + +You start and stop the HistoryServer via its corresponding startup script: + +```sh +# Start or stop the HistoryServer +bin/historyserver.sh (start|stop) +``` + +By default, this server binds to `localhost` and listens at port `8082`. + +Currently, you can only run it as a standalone process. + +## Configuration + +The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs.refresh-interval` need to be adjusted for archiving and displaying archived jobs. + +**JobManager** + +The archiving of completed jobs happens on the JobManager, which uploads the archived job information to a file system directory. You can configure the directory to archive completed jobs in `flink-conf.yaml` by setting a directory via `jobmanager.archive.fs.dir`. + +```sh +# Directory to upload completed job information +jobmanager.archive.fs.dir: hdfs:///completed-jobs +``` + +**HistoryServer** + +The HistoryServer can be configured to monitor a comma-separated list of directories in via `historyserver.archive.fs.dir`. The configured directories are regularly polled for new archives; the polling interval can be configured via `historyserver.archive.fs.refresh-interval`. + +```sh +# Monitor the following directories for completed jobs +historyserver.archive.fs.dir: hdfs:///completed-jobs + +# Refresh every 10 seconds +historyserver.archive.fs.refresh-interval: 10000 +``` + +The contained archives are downloaded and cached in the local filesystem. The local directory for this is configured via `historyserver.web.tmpdir`. + +Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/setup/config.html#history-server). + +## Available Requests + +Below is a list of available requests, with a sample JSON response. All requests are of the sample form `http://hostname:8082/jobs`, below we list only the *path* part of the URLs. + +Values in angle brackets are variables, for example `http://hostname:port/jobs/<jobid>/exceptions` will have to requested for example as `http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`. + + - `/config` + - `/jobs` + - `/joboverview` + - `/jobs/<jobid>` + - `/jobs/<jobid>/vertices` + - `/jobs/<jobid>/config` + - `/jobs/<jobid>/exceptions` + - `/jobs/<jobid>/accumulators` + - `/jobs/<jobid>/vertices/<vertexid>` + - `/jobs/<jobid>/vertices/<vertexid>/subtasktimes` + - `/jobs/<jobid>/vertices/<vertexid>/taskmanagers` + - `/jobs/<jobid>/vertices/<vertexid>/accumulators` + - `/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators` + - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>` + - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>` + - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators` + - `/jobs/<jobid>/plan` http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/monitoring/large_state_tuning.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/large_state_tuning.md b/docs/monitoring/large_state_tuning.md index 78a5cb4..a520106 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/monitoring/large_state_tuning.md @@ -1,7 +1,7 @@ --- title: "Debugging and Tuning Checkpoints and Large State" nav-parent_id: monitoring -nav-pos: 11 +nav-pos: 12 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index d9d5f15..6a5c1ff 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -582,6 +582,23 @@ Previously this key was named `recovery.mode` and the default value was `standal - `metrics.latency.history-size`: (Default: 128) Defines the number of measured latencies to maintain at each operator +### History Server + +You have to configure `jobmanager.archive.fs.dir` in order to archive terminated jobs and add it to the list of monitored directories via `historyserver.archive.fs.dir` if you want to display them via the HistoryServer's web frontend. + +- `jobmanager.archive.fs.dir`: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server via `historyserver.archive.fs.dir`. + +- `historyserver.archive.fs.dir`: Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via `jobmanager.archive.fs.dir`. + +- `historyserver.archive.fs.refresh-interval`: Interval in milliseconds for refreshing the archived job directories (DEFAULT: `10000`). + +- `historyserver.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory (DEFAULT: local system temporary directory). + +- `historyserver.web.address`: Address of the HistoryServer's web interface (DEFAULT: `anyLocalAddress()`). + +- `historyserver.web.port`: Port of the HistoryServers's web interface (DEFAULT: `8082`). + +- `historyserver.web.ssl.enabled`: Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true (DEFAULT: `false`). ## Background http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java new file mode 100644 index 0000000..ebe4f2b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** + * The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for new archives. + */ + public static final ConfigOption<Long> HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL = + key("historyserver.archive.fs.refresh-interval") + .defaultValue(10000L); + + /** + * Comma-separated list of directories which the HistoryServer polls for new archives. + */ + public static final ConfigOption<String> HISTORY_SERVER_ARCHIVE_DIRS = + key("historyserver.archive.fs.dir") + .noDefaultValue(); + + /** + * The local directory used by the HistoryServer web-frontend. + */ + public static final ConfigOption<String> HISTORY_SERVER_WEB_DIR = + key("historyserver.web.tmpdir") + .noDefaultValue(); + + /** + * The address under which the HistoryServer web-frontend is accessible. + */ + public static final ConfigOption<String> HISTORY_SERVER_WEB_ADDRESS = + key("historyserver.web.address") + .noDefaultValue(); + + /** + * The port under which the HistoryServer web-frontend is accessible. + */ + public static final ConfigOption<Integer> HISTORY_SERVER_WEB_PORT = + key("historyserver.web.port") + .defaultValue(8082); + + /** + * The refresh interval for the HistoryServer web-frontend in milliseconds. + */ + public static final ConfigOption<Long> HISTORY_SERVER_WEB_REFRESH_INTERVAL = + key("historyserver.web.refresh-interval") + .defaultValue(10000L); + + /** + * Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if + * {@link ConfigConstants#SECURITY_SSL_ENABLED} is enabled. + */ + public static final ConfigOption<Boolean> HISTORY_SERVER_WEB_SSL_ENABLED = + key("historyserver.web.ssl.enabled") + .defaultValue(false); + + private HistoryServerOptions() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 6f5efe6..d9c9d18 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -18,6 +18,8 @@ package org.apache.flink.configuration; +import static org.apache.flink.configuration.ConfigOptions.key; + import org.apache.flink.annotation.PublicEvolving; /** @@ -36,8 +38,8 @@ public class JobManagerOptions { * leader-election service (like ZooKeeper) is used to elect and discover the JobManager * leader from potentially multiple standby JobManagers. */ - public static final ConfigOption<String> ADDRESS = ConfigOptions - .key("jobmanager.rpc.address") + public static final ConfigOption<String> ADDRESS = + key("jobmanager.rpc.address") .noDefaultValue(); /** @@ -51,15 +53,15 @@ public class JobManagerOptions { * leader-election service (like ZooKeeper) is used to elect and discover the JobManager * leader from potentially multiple standby JobManagers. */ - public static final ConfigOption<Integer> PORT = ConfigOptions - .key("jobmanager.rpc.port") + public static final ConfigOption<Integer> PORT = + key("jobmanager.rpc.port") .defaultValue(6123); /** * The maximum number of prior execution attempts kept in history. */ - public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = ConfigOptions - .key("jobmanager.execution.attempts-history-size") + public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = + key("jobmanager.execution.attempts-history-size") .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); @@ -70,88 +72,95 @@ public class JobManagerOptions { /** * The port for the runtime monitor web-frontend server. */ - public static final ConfigOption<Integer> WEB_PORT = ConfigOptions - .key("jobmanager.web.port") + public static final ConfigOption<Integer> WEB_PORT = + key("jobmanager.web.port") .defaultValue(8081); /** * Config parameter to override SSL support for the JobManager Web UI */ - public static final ConfigOption<Boolean> WEB_SSL_ENABLED = ConfigOptions - .key("jobmanager.web.ssl.enabled") + public static final ConfigOption<Boolean> WEB_SSL_ENABLED = + key("jobmanager.web.ssl.enabled") .defaultValue(true); /** * The config parameter defining the flink web directory to be used by the webmonitor. */ - public static final ConfigOption<String> WEB_TMP_DIR = ConfigOptions - .key("jobmanager.web.tmpdir") + public static final ConfigOption<String> WEB_TMP_DIR = + key("jobmanager.web.tmpdir") .defaultValue(System.getProperty("java.io.tmpdir")); /** * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY. */ - public static final ConfigOption<String> WEB_UPLOAD_DIR = ConfigOptions - .key("jobmanager.web.upload.dir") + public static final ConfigOption<String> WEB_UPLOAD_DIR = + key("jobmanager.web.upload.dir") .noDefaultValue(); /** * The config parameter defining the number of archived jobs for the jobmanager. */ - public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = ConfigOptions - .key("jobmanager.web.history") + public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = + key("jobmanager.web.history") .defaultValue(5); /** * The log file location (may be in /log for standalone but under log directory when using YARN). */ - public static final ConfigOption<String> WEB_LOG_PATH = ConfigOptions - .key("jobmanager.web.log.path") + public static final ConfigOption<String> WEB_LOG_PATH = + key("jobmanager.web.log.path") .noDefaultValue(); /** * Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ - public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = ConfigOptions - .key("jobmanager.web.submit.enable") + public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = + key("jobmanager.web.submit.enable") .defaultValue(true); /** * Config parameter defining the number of checkpoints to remember for recent history. */ - public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE = ConfigOptions - .key("jobmanager.web.checkpoints.history") + public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE = + key("jobmanager.web.checkpoints.history") .defaultValue(10); /** * Time after which cached stats are cleaned up if not accessed. */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions - .key("jobmanager.web.backpressure.cleanup-interval") + public static final ConfigOption<Integer> WEB_BACKPRESSURE_CLEANUP_INTERVAL = + key("jobmanager.web.backpressure.cleanup-interval") .defaultValue(10 * 60 * 1000); /** * Time after which available stats are deprecated and need to be refreshed (by resampling). */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions - .key("jobmanager.web.backpressure.refresh-interval") + public static final ConfigOption<Integer> WEB_BACKPRESSURE_REFRESH_INTERVAL = + key("jobmanager.web.backpressure.refresh-interval") .defaultValue(60 * 1000); /** * Number of stack trace samples to take to determine back pressure. */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES = ConfigOptions - .key("jobmanager.web.backpressure.num-samples") + public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES = + key("jobmanager.web.backpressure.num-samples") .defaultValue(100); /** * Delay between stack trace samples to determine back pressure. */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = ConfigOptions - .key("jobmanager.web.backpressure.delay-between-samples") + public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = + key("jobmanager.web.backpressure.delay-between-samples") .defaultValue(50); + /** + * The location where the JobManager stores the archives of completed jobs. + */ + public static final ConfigOption<String> ARCHIVE_DIR = + key("jobmanager.archive.fs.dir") + .noDefaultValue(); + // --------------------------------------------------------------------------------------------- private JobManagerOptions() { http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index 2230e92..b25358d 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink daemon. -USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zookeeper) [args]" +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zookeeper|historyserver) [args]" STARTSTOP=$1 DAEMON=$2 @@ -42,6 +42,10 @@ case $DAEMON in CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer ;; + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + (*) echo "Unknown daemon '${DAEMON}'. $USAGE." exit 1 http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/flink-bin/bin/historyserver.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/historyserver.sh b/flink-dist/src/main/flink-bin/bin/historyserver.sh new file mode 100644 index 0000000..adc9660 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/historyserver.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink HistoryServer +USAGE="Usage: historyserver.sh (start|stop)" + +STARTSTOP=$1 + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]]; then + args=("--configDir" "${FLINK_CONF_DIR}") +fi + +"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP historyserver "${args[@]}" http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-dist/src/main/resources/flink-conf.yaml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index 72acbeb..446d992 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -81,6 +81,27 @@ jobmanager.web.port: 8081 #jobmanager.web.submit.enable: false +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 #============================================================================== # Streaming state checkpointing http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 39bca71..c1e6229 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -121,8 +121,6 @@ public class WebRuntimeMonitor implements WebMonitor { /** LeaderRetrievalListener which stores the currently leading JobManager and its archive */ private final JobManagerRetriever retriever; - private final Router router; - private final SSLContext serverSSLContext; private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); @@ -243,7 +241,7 @@ public class WebRuntimeMonitor implements WebMonitor { RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); - router = new Router(); + Router router = new Router(); // config how to interact with this web server GET(router, new DashboardConfigHandler(cfg.getRefreshInterval())); @@ -381,7 +379,7 @@ public class WebRuntimeMonitor implements WebMonitor { * * @return array of all JsonArchivists relevant for the history server */ - public static JsonArchivist[] getArchivers() { + public static JsonArchivist[] getJsonArchivists() { JsonArchivist[] archivists = new JsonArchivist[]{ new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(), http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java index 5a8c76f..b7874c9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java @@ -106,7 +106,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); /** Date format for HTTP */ - private static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; /** Be default, we allow files to be cached for 5 minutes */ private static final int HTTP_CACHE_SECONDS = 300; @@ -348,7 +348,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> * @param ctx The channel context to write the response to. * @param status The response status. */ - private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse( HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); @@ -363,7 +363,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> * * @param ctx The channel context to write the response to. */ - private static void sendNotModified(ChannelHandlerContext ctx) { + public static void sendNotModified(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED); setDateHeader(response); @@ -376,7 +376,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> * * @param response HTTP response */ - private static void setDateHeader(FullHttpResponse response) { + public static void setDateHeader(FullHttpResponse response) { SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); dateFormatter.setTimeZone(GMT_TIMEZONE); @@ -390,7 +390,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> * @param response The HTTP response object. * @param fileToCache File to extract the modification timestamp from. */ - private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) { + public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) { SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); dateFormatter.setTimeZone(GMT_TIMEZONE); @@ -411,7 +411,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> * @param response HTTP response * @param file file to extract content type */ - private static void setContentTypeHeader(HttpResponse response, File file) { + public static void setContentTypeHeader(HttpResponse response, File file) { String mimeType = MimeTypes.getMimeTypeForFileName(file.getName()); String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType(); response.headers().set(CONTENT_TYPE, mimeFinal); http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java new file mode 100644 index 0000000..f05bd02 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which + * the JobManager may have already shut down. + * + * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. + * + * All configuration options are defined in{@link HistoryServerOptions}. + * + * The WebInterface only displays the "Completed Jobs" page. + * + * The REST API is limited to + * <ul> + * <li>/config</li> + * <li>/joboverview</li> + * <li>/jobs/:jobid/*</li> + * </ul> + * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + */ +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final Configuration config; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the history server + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + this.config = config; + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling SSL for the history server."); + try { + this.serverSSLContext = SSLUtils.createSSLServerContext(config); + } catch (Exception e) { + throw new IOException("Failed to initialize SSLContext for the history server.", e); + } + } else { + this.serverSSLContext = null; + } + + webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS); + webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT); + webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL); + + String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR); + if (webDirectory == null) { + webDirectory = System.getProperty("java.io.tmpdir") + "flink-web-history-" + UUID.randomUUID(); + } + webDir = new File(webDirectory); + + String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS); + if (refreshDirectories == null) { + throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured."); + } + List<RefreshLocation> refreshDirs = new ArrayList<>(); + for (String refreshDirectory : refreshDirectories.split(",")) { + try { + Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri()); + FileSystem refreshFS = refreshPath.getFileSystem(); + refreshDirs.add(new RefreshLocation(refreshPath, refreshFS)); + } catch (Exception e) { + // there's most likely something wrong with the path itself, so we ignore it from here on + LOG.warn("Failed to create Path or FileSystem for directory '{}'. Directory will not be monitored.", refreshDirectory, e); + } + } + + if (refreshDirs.isEmpty()) { + throw new FlinkException("Failed to validate any of the configured directories to monitor."); + } + + long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL); + archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir); + + this.shutdownHook = new Thread() { + @Override + public void run() { + HistoryServer.this.stop(); + } + }; + // add shutdown hook for deleting the directories and remaining temp files on shutdown + try { + Runtime.getRuntime().addShutdownHook(shutdownHook); + } catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); + } catch (Throwable t) { + // these errors usually happen when the shutdown is already in progress + LOG.warn("Error while adding shutdown hook", t); + } + } + + public void run() { + try { + start(); + new CountDownLatch(1).await(); + } catch (Exception e) { + LOG.error("Failure while running HistoryServer.", e); + } finally { + stop(); + } + } + + // ------------------------------------------------------------------------ + // Life-cycle + // ------------------------------------------------------------------------ + + void start() throws IOException, InterruptedException { + synchronized (startupShutdownLock) { + LOG.info("Starting history server."); + + Files.createDirectories(webDir.toPath()); + LOG.info("Using directory {} as local cache.", webDir); + + Router router = new Router(); + router.GET("/:*", new HistoryServerStaticFileServerHandler(webDir)); + + if (!webDir.exists() && !webDir.mkdirs()) { + throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + "."); + } + + createDashboardConfigFile(); + + archiveFetcher.start(); + + netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLContext, webAddress, webPort, config); + } + } + + void stop() { + if (shutdownRequested.compareAndSet(false, true)) { + synchronized (startupShutdownLock) { + LOG.info("Stopping history server."); + + try { + netty.shutdown(); + } catch (Throwable t) { + LOG.warn("Error while shutting down WebFrontendBootstrap.", t); + } + + archiveFetcher.stop(); + + try { + LOG.info("Removing web dashboard root cache directory {}", webDir); + FileUtils.deleteDirectory(webDir); + } catch (Throwable t) { + LOG.warn("Error while deleting web root directory {}", webDir, t); + } + + LOG.info("Stopped history server."); + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } catch (IllegalStateException ignored) { + // race, JVM is in shutdown already, we can safely ignore this + } catch (Throwable t) { + LOG.warn("Exception while unregistering HistoryServer cleanup shutdown hook."); + } + } + } + } + } + + // ------------------------------------------------------------------------ + // File generation + // ------------------------------------------------------------------------ + + static FileWriter createOrGetFile(File folder, String name) throws IOException { + File file = new File(folder, name + ".json"); + if (!file.exists()) { + Files.createFile(file.toPath()); + } + FileWriter fr = new FileWriter(file); + return fr; + } + + private void createDashboardConfigFile() throws IOException { + try (FileWriter fw = createOrGetFile(webDir, "config")) { + fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis)); + fw.flush(); + } catch (IOException ioe) { + LOG.error("Failed to write config file."); + throw ioe; + } + } + + /** + * Container for the {@link Path} and {@link FileSystem} of a refresh directory. + */ + static class RefreshLocation { + private final Path path; + private final FileSystem fs; + + private RefreshLocation(Path path, FileSystem fs) { + this.path = path; + this.fs = fs; + } + + public Path getPath() { + return path; + } + + public FileSystem getFs() { + return fs; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java new file mode 100644 index 0000000..824d6c9 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + */ +class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** + * {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for + * new job archives. + */ + static class JobArchiveFetcherTask extends TimerTask { + + private final List<HistoryServer.RefreshLocation> refreshDirs; + + /** Cache of all available jobs identified by their id. */ + private final Set<String> cachedArchives; + + private final File webDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir) { + this.refreshDirs = checkNotNull(refreshDirs); + this.cachedArchives = new HashSet<>(); + this.webDir = checkNotNull(webDir); + this.webJobDir = new File(webDir, "jobs"); + webJobDir.mkdir(); + this.webOverviewDir = new File(webDir, "overviews"); + webOverviewDir.mkdir(); + } + + @Override + public void run() { + try { + for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { + Path refreshDir = refreshLocation.getPath(); + FileSystem refreshFS = refreshLocation.getFs(); + + // contents of /:refreshDir + FileStatus[] jobArchives; + try { + jobArchives = refreshFS.listStatus(refreshDir); + } catch (IOException e) { + LOG.error("Failed to access job archive location for path {}.", refreshDir, e); + continue; + } + if (jobArchives == null) { + continue; + } + boolean updateOverview = false; + for (FileStatus jobArchive : jobArchives) { + Path jobArchivePath = jobArchive.getPath(); + String jobID = jobArchivePath.getName(); + try { + JobID.fromHexString(jobID); + } catch (IllegalArgumentException iae) { + LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.", + refreshDir, jobID, iae); + continue; + } + if (cachedArchives.add(jobID)) { + try { + for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) { + String path = archive.getPath(); + String json = archive.getJson(); + + File target; + if (path.equals("/joboverview")) { + target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); + } else { + target = new File(webDir, path + JSON_FILE_ENDING); + } + + java.nio.file.Path parent = target.getParentFile().toPath(); + + try { + Files.createDirectories(parent); + } catch (FileAlreadyExistsException ignored) { + // there may be left-over directories from the previous attempt + } + + java.nio.file.Path targetPath = target.toPath(); + + // We overwrite existing files since this may be another attempt at fetching this archive. + // Existing files may be incomplete/corrupt. + if (Files.exists(targetPath)) { + Files.delete(targetPath); + } + + Files.createFile(target.toPath()); + try (FileWriter fw = new FileWriter(target)) { + fw.write(json); + fw.flush(); + } + } + updateOverview = true; + } catch (IOException e) { + LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e); + // Make sure we attempt to fetch the archive again + cachedArchives.remove(jobID); + // Make sure we do not include this job in the overview + try { + Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath()); + } catch (IOException ioe) { + LOG.debug("Could not delete file from overview directory.", ioe); + } + + // Clean up job files we may have created + File jobDirectory = new File(webJobDir, jobID); + try { + FileUtils.deleteDirectory(jobDirectory); + } catch (IOException ioe) { + LOG.debug("Could not clean up job directory.", ioe); + } + } + } + } + if (updateOverview) { + updateJobOverview(webDir); + } + } + } catch (Exception e) { + LOG.error("Critical failure while fetching/processing job archives.", e); + } + } + } + + /** + * This method replicates the JSON response that would be given by the {@link CurrentJobsOverviewHandler} when + * listing both running and finished jobs. + * + * Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on + * their own however the list of finished jobs only contains a single job. + * + * For the display in the HistoryServer WebFrontend we have to combine these overviews. + */ + private static void updateJobOverview(File webDir) { + File webOverviewDir = new File(webDir, "overviews"); + try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, "joboverview"))) { + gen.writeStartObject(); + gen.writeArrayFieldStart("running"); + gen.writeEndArray(); + gen.writeArrayFieldStart("finished"); + + File[] overviews = new File(webOverviewDir.getPath()).listFiles(); + if (overviews != null) { + for (File overview : overviews) { + JsonNode root = mapper.readTree(overview); + JsonNode finished = root.get("finished"); + JsonNode job = finished.get(0); + mapper.writeTree(gen, job); + } + } + + gen.writeEndArray(); + gen.writeEndObject(); + } catch (IOException ioe) { + LOG.error("Failed to update job overview.", ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java new file mode 100644 index 0000000..31e9bbc --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +/***************************************************************************** + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + *****************************************************************************/ + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Simple file server handler used by the {@link HistoryServer} that serves requests to web frontend's static files, + * such as HTML, CSS, JS or JSON files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + * + * This class is a copy of the {@link StaticFileServerHandler}. The differences are that the request path is + * modified to end on ".json" if it does not have a filename extension; when "index.html" is requested we load + * "index_hs.html" instead to inject the modified HistoryServer WebInterface and that the caching of the "/joboverview" + * page is prevented. + */ [email protected] +public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> { + + /** Default logger, if none is specified */ + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class); + + // ------------------------------------------------------------------------ + + /** The path in which the static documents are */ + private final File rootPath; + + public HistoryServerStaticFileServerHandler(File rootPath) throws IOException { + this.rootPath = checkNotNull(rootPath).getCanonicalFile(); + } + + // ------------------------------------------------------------------------ + // Responses to requests + // ------------------------------------------------------------------------ + + @Override + public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { + String requestPath = routed.path(); + + respondWithFile(ctx, routed.request(), requestPath); + } + + /** + * Response when running with leading JobManager. + */ + private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, String requestPath) + throws IOException, ParseException { + + // make sure we request the "index.html" in case there is a directory request + if (requestPath.endsWith("/")) { + requestPath = requestPath + "index.html"; + } + + if (!requestPath.contains(".")) { // we assume that the path ends in either .html or .js + requestPath = requestPath + ".json"; + } + + // convert to absolute path + final File file = new File(rootPath, requestPath); + + if (!file.exists()) { + // file does not exist. Try to load it with the classloader + ClassLoader cl = HistoryServerStaticFileServerHandler.class.getClassLoader(); + + String pathToLoad = requestPath.replace("index.html", "index_hs.html"); + + try (InputStream resourceStream = cl.getResourceAsStream("web" + pathToLoad)) { + boolean success = false; + try { + if (resourceStream != null) { + URL root = cl.getResource("web"); + URL requested = cl.getResource("web" + pathToLoad); + + if (root != null && requested != null) { + URI rootURI = new URI(root.getPath()).normalize(); + URI requestedURI = new URI(requested.getPath()).normalize(); + + // Check that we don't load anything from outside of the + // expected scope. + if (!rootURI.relativize(requestedURI).equals(requestedURI)) { + LOG.debug("Loading missing file from classloader: {}", pathToLoad); + // ensure that directory to file exists. + file.getParentFile().mkdirs(); + Files.copy(resourceStream, file.toPath()); + + success = true; + } + } + } + } catch (Throwable t) { + LOG.error("error while responding", t); + } finally { + if (!success) { + LOG.debug("Unable to load requested file {} from classloader", pathToLoad); + StaticFileServerHandler.sendError(ctx, NOT_FOUND); + return; + } + } + } + } + + if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) { + StaticFileServerHandler.sendError(ctx, NOT_FOUND); + return; + } + + if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { + StaticFileServerHandler.sendError(ctx, NOT_FOUND); + return; + } + + // cache validation + final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); + if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { + SimpleDateFormat dateFormatter = new SimpleDateFormat(StaticFileServerHandler.HTTP_DATE_FORMAT, Locale.US); + Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); + + // Only compare up to the second because the datetime format we send to the client + // does not have milliseconds + long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; + long fileLastModifiedSeconds = file.lastModified() / 1000; + if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { + if (LOG.isDebugEnabled()) { + LOG.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\''); + } + + StaticFileServerHandler.sendNotModified(ctx); + return; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Responding with file '" + file.getAbsolutePath() + '\''); + } + + // Don't need to close this manually. Netty's DefaultFileRegion will take care of it. + final RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException e) { + StaticFileServerHandler.sendError(ctx, NOT_FOUND); + return; + } + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + StaticFileServerHandler.setContentTypeHeader(response, file); + + // the job overview should be updated as soon as possible + if (!requestPath.equals("/joboverview.json")) { + StaticFileServerHandler.setDateAndCacheHeaders(response, file); + } + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) == null) { + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), + ctx.newProgressivePromise()); + // HttpChunkedInput will write the end marker (LastHttpContent) for us. + } + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (ctx.channel().isActive()) { + LOG.error("Caught exception", cause); + StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java new file mode 100644 index 0000000..1c51b43 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.junit.Assert; +import org.junit.Test; + +public class WebMonitorUtilsTest { + + @Test + public void testGetArchivers() { + JsonArchivist[] direct = WebRuntimeMonitor.getJsonArchivists(); + JsonArchivist[] reflected = WebMonitorUtils.getJsonArchivists(); + + Assert.assertEquals(direct.length, reflected.length); + for(int x = 0; x < direct.length; x++) { + Assert.assertSame(direct[x].getClass(), reflected[x].getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java new file mode 100644 index 0000000..f23c249 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.regex.Pattern; + +/** + * This test must reside in flink-runtime-web since the {@link FsJobArchivist} relies on + * {@link WebRuntimeMonitor#getJsonArchivists()}. + */ +public class FsJobArchivistTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testArchiveJob() throws Exception { + Path tmpPath = new Path(tmpFolder.getRoot().getAbsolutePath()); + + AccessExecutionGraph graph = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> expected = new ArrayList<>(); + for (JsonArchivist archivist : WebRuntimeMonitor.getJsonArchivists()) { + for (ArchivedJson archive : archivist.archiveJsonWithPath(graph)) { + expected.add(archive); + } + } + + Path archivePath = FsJobArchivist.archiveJob(tmpPath, graph); + Collection<ArchivedJson> actual = FsJobArchivist.getArchivedJsons(archivePath); + + Assert.assertEquals(expected.size(), actual.size()); + + Iterator<ArchivedJson> eI = expected.iterator(); + Iterator<ArchivedJson> aI = actual.iterator(); + + // several jsons contain a dynamic "now" field that depends on the time of creation, so we can't easily compare + // the json and only check the path + // /jobs/:jobid + // /jobs/:jobid/vertices + // /jobs/:jobid/vertices/:vertexid + // /jobs/:jobid/vertices/:vertexid/subtasktimes + // /jobs/:jobid/vertices/:vertexid/taskmanagers + Pattern jobDetailsPattern = Pattern.compile("/jobs/[a-fA-F0-9]{32}(/vertices(/[a-fA-F0-9]{32}(/(subtasktimes|taskmanagers))?)?)?"); + while (eI.hasNext() && aI.hasNext()) { + // technically there isn't guarantee that the order is identical, but as it stands this is the case + ArchivedJson exp = eI.next(); + ArchivedJson act = aI.next(); + if (jobDetailsPattern.matcher(exp.getPath()).matches()) { + Assert.assertEquals(exp.getPath(), act.getPath()); + } else { + Assert.assertEquals(exp, act); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java new file mode 100644 index 0000000..3eff02a --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class HistoryServerStaticFileServerHandlerTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testRespondWithFile() throws Exception { + File webDir = tmp.newFolder("webDir"); + Router router = new Router() + .GET("/:*", new HistoryServerStaticFileServerHandler(webDir)); + WebFrontendBootstrap webUI = new WebFrontendBootstrap( + router, + LoggerFactory.getLogger(HistoryServerStaticFileServerHandlerTest.class), + tmp.newFolder("uploadDir"), + null, + "localhost", + 8081, + new Configuration()); + try { + // verify that 404 message is returned when requesting a non-existant file + String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:8081/hello"); + Assert.assertTrue(notFound404.contains("404 Not Found")); + + // verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer + // index_hs.html is injected + String index = HistoryServerTest.getFromHTTP("http://localhost:8081/index.html"); + Assert.assertTrue(index.contains("Completed Jobs")); + + // verify that index.html is appended if the request path ends on '/' + String index2 = HistoryServerTest.getFromHTTP("http://localhost:8081/"); + Assert.assertEquals(index, index2); + + // verify that a 404 message is returned when requesting a directory + File dir = new File(webDir, "dir.json"); + dir.mkdirs(); + String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:8081/dir"); + Assert.assertTrue(dirNotFound404.contains("404 Not Found")); + + // verify that a 404 message is returned when requesting a file outside the webDir + tmp.newFile("secret"); + String x = HistoryServerTest.getFromHTTP("http://localhost:8081/../secret"); + Assert.assertTrue(x.contains("404 Not Found")); + } finally { + webUI.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java new file mode 100644 index 0000000..78fbe0b --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.messages.ArchiveMessages; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import scala.Option; + +public class HistoryServerTest { + + @Rule + public TemporaryFolder tmpDir = new TemporaryFolder(); + + @Test + public void testFullArchiveLifecycle() throws Exception { + ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob(); + + File jmDirectory = tmpDir.newFolder("jm"); + File hsDirectory = tmpDir.newFolder("hs"); + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); + + config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); + config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath()); + + ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config); + Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString())); + + ActorRef memoryArchivist = actorSystem.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath)); + memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null); + + File archive = new File(jmDirectory, graph.getJobID().toString()); + for (int x = 0; x < 10 && !archive.exists(); x++) { + Thread.sleep(50); + } + Assert.assertTrue(archive.exists()); + + HistoryServer hs = new HistoryServer(config); + try { + hs.start(); + ObjectMapper mapper = new ObjectMapper(); + JsonNode overview = null; + for (int x = 0; x < 20; x++) { + Thread.sleep(50); + String response = getFromHTTP("http://localhost:8082/joboverview"); + if (response.contains("404 Not Found")) { + // file may not be written yet + continue; + } else { + try { + overview = mapper.readTree(response); + break; + } catch (IOException ignored) { + // while the file may exist the contents may not have been written yet + continue; + } + } + } + Assert.assertNotNull("/joboverview.json did not contain valid json", overview); + String jobID = overview.get("finished").get(0).get("jid").asText(); + JsonNode jobDetails = mapper.readTree(getFromHTTP("http://localhost:8082/jobs/" + jobID)); + Assert.assertNotNull(jobDetails.get("jid")); + } finally { + hs.stop(); + } + } + + public static String getFromHTTP(String url) throws Exception { + URL u = new URL(url); + HttpURLConnection connection = (HttpURLConnection) u.openConnection(); + connection.setConnectTimeout(100000); + connection.connect(); + InputStream is; + if (connection.getResponseCode() >= 400) { + // error! + is = connection.getErrorStream(); + } else { + is = connection.getInputStream(); + } + + return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/app/index_hs.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/index_hs.jade b/flink-runtime-web/web-dashboard/app/index_hs.jade new file mode 100644 index 0000000..9cf7411 --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/index_hs.jade @@ -0,0 +1,60 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +doctype html +html(lang='en') + head + meta(charset='utf-8') + meta(http-equiv='X-UA-Compatible', content='IE=edge') + meta(name='viewport', content='width=device-width, initial-scale=1') + + title Apache Flink History Server + + link(rel="apple-touch-icon", sizes="180x180", href="images/apple-touch-icon.png") + link(rel="icon", type="image/png", href="images/favicon-32x32.png", sizes="32x32") + link(rel="icon", type="image/png", href="images/favicon-16x16.png", sizes="16x16") + link(rel="manifest", href="images/manifest.json") + link(rel="mask-icon", href="images/safari-pinned-tab.svg", color="#aa1919") + link(rel="shortcut icon", href="images/favicon.ico") + meta(name="msapplication-config", content="images/browserconfig.xml") + meta(name="theme-color", content="#ffffff") + + link(rel='stylesheet', href='css/vendor.css', type='text/css') + link(rel='stylesheet', href='css/index.css', type='text/css') + + script(src="js/vendor.js") + script(src="js/hs/index.js") + + body(ng-app="flinkApp" ng-strict-di) + #sidebar(ng-class="{ 'sidebar-visible': sidebarVisible }") + nav.navbar.navbar-inverse.navbar-static-top + .navbar-header + a.navbar-brand(ui-sref="completed-jobs") + img.logo(alt="Apache Flink Dashboard" src="images/flink-logo.png") + a.navbar-brand.navbar-brand-text(ui-sref="completed-jobs") + | Apache Flink Dashboard + + .navbar.navbar-sidebar + ul.nav + li + a(ui-sref="completed-jobs" ui-sref-active='active') + i.fa.fa-check-circle.fa-fw + | + | Completed Jobs + + #content(ng-class="{ 'sidebar-visible': sidebarVisible }") + #main(ui-view='main') http://git-wip-us.apache.org/repos/asf/flink/blob/2cab0cb2/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee b/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee new file mode 100644 index 0000000..55d6b0c --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee @@ -0,0 +1,208 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) + +# -------------------------------------- + +.run ($rootScope) -> + $rootScope.sidebarVisible = false + $rootScope.showSidebar = -> + $rootScope.sidebarVisible = !$rootScope.sidebarVisible + $rootScope.sidebarClass = 'force-show' + +# -------------------------------------- + +.value 'flinkConfig', { + jobServer: '' +# jobServer: 'http://localhost:8081/' + "refresh-interval": 10000 +} + +# -------------------------------------- + +.value 'watermarksConfig', { + # A value of (Java) Long.MIN_VALUE indicates that there is no watermark + # available. This is parsed by Javascript as this number. We have it as + # a constant here to compare available watermarks against. + noWatermark: -9223372036854776000 +} + +# -------------------------------------- + +.run (JobsService, MainService, flinkConfig, $interval) -> + MainService.loadConfig().then (config) -> + angular.extend flinkConfig, config + + JobsService.listJobs() + + $interval -> + JobsService.listJobs() + , flinkConfig["refresh-interval"] + + +# -------------------------------------- + +.config ($uiViewScrollProvider) -> + $uiViewScrollProvider.useAnchorScroll() + +# -------------------------------------- + +.run ($rootScope, $state) -> + $rootScope.$on '$stateChangeStart', (event, toState, toParams, fromState) -> + if toState.redirectTo + event.preventDefault() + $state.go toState.redirectTo, toParams + +# -------------------------------------- + +.config ($stateProvider, $urlRouterProvider) -> + $stateProvider.state "completed-jobs", + url: "/completed-jobs" + views: + main: + templateUrl: "partials/jobs/completed-jobs.html" + controller: 'CompletedJobsController' + + .state "single-job", + url: "/jobs/{jobid}" + abstract: true + views: + main: + templateUrl: "partials/jobs/job.html" + controller: 'SingleJobController' + + .state "single-job.plan", + url: "" + redirectTo: "single-job.plan.subtasks" + views: + details: + templateUrl: "partials/jobs/job.plan.html" + controller: 'JobPlanController' + + .state "single-job.plan.subtasks", + url: "" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.subtasks.html" + controller: 'JobPlanSubtasksController' + + .state "single-job.plan.metrics", + url: "/metrics" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.metrics.html" + controller: 'JobPlanMetricsController' + + .state "single-job.plan.watermarks", + url: "/watermarks" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.watermarks.html" + + .state "single-job.plan.taskmanagers", + url: "/taskmanagers" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.taskmanagers.html" + controller: 'JobPlanTaskManagersController' + + .state "single-job.plan.accumulators", + url: "/accumulators" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.accumulators.html" + controller: 'JobPlanAccumulatorsController' + + .state "single-job.plan.checkpoints", + url: "/checkpoints" + redirectTo: "single-job.plan.checkpoints.overview" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.checkpoints.html" + controller: 'JobPlanCheckpointsController' + + .state "single-job.plan.checkpoints.overview", + url: "/overview" + views: + 'checkpoints-view': + templateUrl: "partials/jobs/job.plan.node.checkpoints.overview.html" + controller: 'JobPlanCheckpointsController' + + .state "single-job.plan.checkpoints.summary", + url: "/summary" + views: + 'checkpoints-view': + templateUrl: "partials/jobs/job.plan.node.checkpoints.summary.html" + controller: 'JobPlanCheckpointsController' + + .state "single-job.plan.checkpoints.history", + url: "/history" + views: + 'checkpoints-view': + templateUrl: "partials/jobs/job.plan.node.checkpoints.history.html" + controller: 'JobPlanCheckpointsController' + + .state "single-job.plan.checkpoints.config", + url: "/config" + views: + 'checkpoints-view': + templateUrl: "partials/jobs/job.plan.node.checkpoints.config.html" + controller: 'JobPlanCheckpointsController' + + .state "single-job.plan.checkpoints.details", + url: "/details/{checkpointId}" + views: + 'checkpoints-view': + templateUrl: "partials/jobs/job.plan.node.checkpoints.details.html" + controller: 'JobPlanCheckpointDetailsController' + + .state "single-job.plan.backpressure", + url: "/backpressure" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.backpressure.html" + controller: 'JobPlanBackPressureController' + + .state "single-job.timeline", + url: "/timeline" + views: + details: + templateUrl: "partials/jobs/job.timeline.html" + + .state "single-job.timeline.vertex", + url: "/{vertexId}" + views: + vertex: + templateUrl: "partials/jobs/job.timeline.vertex.html" + controller: 'JobTimelineVertexController' + + .state "single-job.exceptions", + url: "/exceptions" + views: + details: + templateUrl: "partials/jobs/job.exceptions.html" + controller: 'JobExceptionsController' + + .state "single-job.config", + url: "/config" + views: + details: + templateUrl: "partials/jobs/job.config.html" + + $urlRouterProvider.otherwise "/completed-jobs"
