[FLINK-5870] Handlers define REST URLs

This closes #3376.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/999bacef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/999bacef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/999bacef

Branch: refs/heads/master
Commit: 999baceff36165d950a61dd9cc4342f252e64837
Parents: 51b7ede
Author: zentol <[email protected]>
Authored: Wed Mar 1 12:23:15 2017 +0100
Committer: zentol <[email protected]>
Committed: Thu Mar 2 11:39:04 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       |   5 +
 .../webmonitor/RuntimeMonitorHandlerBase.java   |   7 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 193 +++++++++++--------
 .../handlers/ClusterOverviewHandler.java        |   7 +
 .../handlers/CurrentJobIdsHandler.java          |   7 +
 .../handlers/CurrentJobsOverviewHandler.java    |  16 ++
 .../handlers/DashboardConfigHandler.java        |   7 +
 .../handlers/JarAccessDeniedHandler.java        |  11 ++
 .../webmonitor/handlers/JarDeleteHandler.java   |   7 +
 .../webmonitor/handlers/JarListHandler.java     |   7 +
 .../webmonitor/handlers/JarPlanHandler.java     |   7 +
 .../webmonitor/handlers/JarRunHandler.java      |   7 +
 .../webmonitor/handlers/JarUploadHandler.java   |   7 +
 .../handlers/JobAccumulatorsHandler.java        |   7 +
 .../handlers/JobCancellationHandler.java        |   8 +
 .../JobCancellationWithSavepointHandlers.java   |  17 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   7 +
 .../webmonitor/handlers/JobDetailsHandler.java  |   8 +
 .../handlers/JobExceptionsHandler.java          |   7 +
 .../handlers/JobManagerConfigHandler.java       |   7 +
 .../webmonitor/handlers/JobPlanHandler.java     |   6 +
 .../webmonitor/handlers/JobStoppingHandler.java |   8 +
 .../handlers/JobVertexAccumulatorsHandler.java  |   7 +
 .../handlers/JobVertexBackPressureHandler.java  |   7 +
 .../handlers/JobVertexDetailsHandler.java       |   7 +
 .../handlers/JobVertexTaskManagersHandler.java  |   7 +
 .../webmonitor/handlers/RequestHandler.java     |   7 +
 .../SubtaskCurrentAttemptDetailsHandler.java    |   7 +
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +
 .../SubtaskExecutionAttemptDetailsHandler.java  |   7 +
 .../SubtasksAllAccumulatorsHandler.java         |   7 +
 .../handlers/SubtasksTimesHandler.java          |   6 +
 .../handlers/TaskManagerLogHandler.java         |  12 ++
 .../handlers/TaskManagersHandler.java           |   8 +
 .../checkpoints/CheckpointConfigHandler.java    |   7 +
 .../CheckpointStatsDetailsHandler.java          |   7 +
 .../CheckpointStatsDetailsSubtasksHandler.java  |   7 +
 .../checkpoints/CheckpointStatsHandler.java     |   7 +
 .../metrics/JobManagerMetricsHandler.java       |   8 +
 .../webmonitor/metrics/JobMetricsHandler.java   |   6 +
 .../metrics/JobVertexMetricsHandler.java        |   6 +
 .../metrics/TaskManagerMetricsHandler.java      |   8 +
 .../handlers/ClusterOverviewHandlerTest.java    |  34 ++++
 .../handlers/CurrentJobIdsHandlerTest.java      |  34 ++++
 .../CurrentJobsOverviewHandlerTest.java         |  44 +++++
 .../handlers/DashboardConfigHandlerTest.java    |  31 +++
 .../handlers/JarAccessDeniedHandlerTest.java    |  39 ++++
 .../handlers/JarDeleteHandlerTest.java          |  31 +++
 .../webmonitor/handlers/JarListHandlerTest.java |  31 +++
 .../webmonitor/handlers/JarPlanHandlerTest.java |  31 +++
 .../webmonitor/handlers/JarRunHandlerTest.java  |  31 +++
 .../handlers/JarUploadHandlerTest.java          |  31 +++
 .../handlers/JobAccumulatorsHandlerTest.java    |  31 +++
 .../handlers/JobCancellationHandlerTest.java    |  36 ++++
 ...obCancellationWithSavepointHandlersTest.java |  20 ++
 .../handlers/JobConfigHandlerTest.java          |  31 +++
 .../handlers/JobDetailsHandlerTest.java         |  36 ++++
 .../handlers/JobExceptionsHandlerTest.java      |  31 +++
 .../handlers/JobManagerConfigHandlerTest.java   |  31 +++
 .../webmonitor/handlers/JobPlanHandlerTest.java |  31 +++
 .../handlers/JobStoppingHandlerTest.java        |  36 ++++
 .../JobVertexAccumulatorsHandlerTest.java       |  31 +++
 .../JobVertexBackPressureHandlerTest.java       |   8 +
 .../handlers/JobVertexDetailsHandlerTest.java   |  31 +++
 .../JobVertexTaskManagersHandlerTest.java       |  31 +++
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  31 +++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  31 +++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  31 +++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  31 +++
 .../handlers/SubtasksTimesHandlerTest.java      |  31 +++
 .../handlers/TaskManagerLogHandlerTest.java     |  28 +++
 .../handlers/TaskManagersHandlerTest.java       |  38 ++++
 .../CheckpointConfigHandlerTest.java            |   9 +
 .../CheckpointStatsDetailsHandlerTest.java      |   9 +
 .../checkpoints/CheckpointStatsHandlerTest.java |   9 +
 ...heckpointStatsSubtaskDetailsHandlerTest.java |   9 +
 .../metrics/JobManagerMetricsHandlerTest.java   |  10 +
 .../metrics/JobMetricsHandlerTest.java          |  10 +
 .../metrics/JobVertexMetricsHandlerTest.java    |  10 +
 .../metrics/TaskManagerMetricsHandlerTest.java  |  10 +
 80 files changed, 1439 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 8dbd135..8bd58a3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -76,6 +76,11 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
        }
 
        @Override
+       public String[] getPaths() {
+               return handler.getPaths();
+       }
+
+       @Override
        protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, ActorGateway jobManager) {
                FullHttpResponse response;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index 9442867..3c1dcb6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -67,6 +67,13 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
                this.httpsEnabled = httpsEnabled;
        }
 
+       /**
+        * Returns an array of REST URL's under which this handler can be 
registered.
+        *
+        * @return array containing REST URL's under which this handler can be 
registered.
+        */
+       public abstract String[] getPaths();
+
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
                if (localJobManagerAddressFuture.isCompleted()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a9cb630..dddc69d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -256,56 +256,50 @@ public class WebRuntimeMonitor implements WebMonitor {
                RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
                RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
 
-               router = new Router()
-                       // config how to interact with this web server
-                       .GET("/config", handler(new 
DashboardConfigHandler(cfg.getRefreshInterval())))
-
-                       // the overview - how many task managers, slots, free 
slots, ...
-                       .GET("/overview", handler(new 
ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
-
-                       // job manager configuration
-                       .GET("/jobmanager/config", handler(new 
JobManagerConfigHandler(config)))
-
-                       // overview over jobs
-                       .GET("/joboverview", handler(new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))
-                       .GET("/joboverview/running", handler(new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)))
-                       .GET("/joboverview/completed", handler(new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)))
-
-                       .GET("/jobs", handler(new 
CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
-
-                       .GET("/jobs/:jobid", handler(new 
JobDetailsHandler(currentGraphs, metricFetcher)))
-                       .GET("/jobs/:jobid/vertices", handler(new 
JobDetailsHandler(currentGraphs, metricFetcher)))
-
-                       .GET("/jobs/:jobid/vertices/:vertexid", handler(new 
JobVertexDetailsHandler(currentGraphs, metricFetcher)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", 
handler(new SubtasksTimesHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", 
handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/accumulators", 
handler(new JobVertexAccumulatorsHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/backpressure", 
handler(new JobVertexBackPressureHandler(
-                                                       currentGraphs,
-                                                       
backPressureStatsTracker,
-                                                       refreshInterval)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/metrics", 
handler(new JobVertexMetricsHandler(metricFetcher)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new 
SubtasksAllAccumulatorsHandler(currentGraphs)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", 
handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, 
metricFetcher)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
-
-                       .GET("/jobs/:jobid/plan", handler(new 
JobPlanHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/config", handler(new 
JobConfigHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/exceptions", handler(new 
JobExceptionsHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/accumulators", handler(new 
JobAccumulatorsHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/metrics", handler(new 
JobMetricsHandler(metricFetcher)))
-
-                       .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
-                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
-                                       TaskManagerLogHandler.FileMode.LOG, 
config, enableSSL))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
-                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
-                                       TaskManagerLogHandler.FileMode.STDOUT, 
config, enableSSL))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagerMetricsHandler(metricFetcher)))
+               router = new Router();
+               // config how to interact with this web server
+               GET(router, new 
DashboardConfigHandler(cfg.getRefreshInterval()));
+
+               // the overview - how many task managers, slots, free slots, ...
+               GET(router, new 
ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+
+               // job manager configuration
+               GET(router, new JobManagerConfigHandler(config));
+
+               // overview over jobs
+               GET(router, new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
+               GET(router, new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
+               GET(router, new 
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
+
+               GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
+
+               GET(router, new JobDetailsHandler(currentGraphs, 
metricFetcher));
+
+               GET(router, new JobVertexDetailsHandler(currentGraphs, 
metricFetcher));
+               GET(router, new SubtasksTimesHandler(currentGraphs));
+               GET(router, new JobVertexTaskManagersHandler(currentGraphs, 
metricFetcher));
+               GET(router, new JobVertexAccumulatorsHandler(currentGraphs));
+               GET(router, new JobVertexBackPressureHandler(currentGraphs,     
backPressureStatsTracker, refreshInterval));
+               GET(router, new JobVertexMetricsHandler(metricFetcher));
+               GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
+               GET(router, new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
+               GET(router, new 
SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
+               GET(router, new 
SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
+
+               GET(router, new JobPlanHandler(currentGraphs));
+               GET(router, new JobConfigHandler(currentGraphs));
+               GET(router, new JobExceptionsHandler(currentGraphs));
+               GET(router, new JobAccumulatorsHandler(currentGraphs));
+               GET(router, new JobMetricsHandler(metricFetcher));
+
+               GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, 
metricFetcher));
+               GET(router, new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
+                               TaskManagerLogHandler.FileMode.LOG, config, 
enableSSL));
+               GET(router, new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
+                               TaskManagerLogHandler.FileMode.STDOUT, config, 
enableSSL));
+               GET(router, new TaskManagerMetricsHandler(metricFetcher));
 
+               router
                        // log and stdout
                        .GET("/jobmanager/log", logFiles.logFile == null ? new 
ConstantTextHandler("(log file unavailable)") :
                                new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.logFile,
@@ -313,25 +307,22 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                        .GET("/jobmanager/stdout", logFiles.stdOutFile == null 
? new ConstantTextHandler("(stdout file unavailable)") :
                                new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
-                                       enableSSL))
-
-                       .GET("/jobmanager/metrics", handler(new 
JobManagerMetricsHandler(metricFetcher)))
+                                       enableSSL));
 
-                       // Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
-                       .GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
+               GET(router, new JobManagerMetricsHandler(metricFetcher));
 
-                       // DELETE is the preferred way of canceling a job 
(Rest-conform)
-                       .DELETE("/jobs/:jobid/cancel", handler(new 
JobCancellationHandler()))
+               // Cancel a job via GET (for proper integration with YARN this 
has to be performed via GET)
+               GET(router, new JobCancellationHandler());
+               // DELETE is the preferred way of canceling a job (Rest-conform)
+               DELETE(router, new JobCancellationHandler());
 
-                       .GET("/jobs/:jobid/cancel-with-savepoint", 
triggerHandler)
-                       
.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", 
triggerHandler)
-                       
.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
+               GET(router, triggerHandler);
+               GET(router, inProgressHandler);
 
-                       // stop a job via GET (for proper integration with YARN 
this has to be performed via GET)
-                       .GET("/jobs/:jobid/yarn-stop", handler(new 
JobStoppingHandler()))
-
-                       // DELETE is the preferred way of stopping a job 
(Rest-conform)
-                       .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()));
+               // stop a job via GET (for proper integration with YARN this 
has to be performed via GET)
+               GET(router, new JobStoppingHandler());
+               // DELETE is the preferred way of stopping a job (Rest-conform)
+               DELETE(router, new JobStoppingHandler());
 
                int maxCachedEntries = config.getInteger(
                                
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
@@ -339,34 +330,32 @@ public class WebRuntimeMonitor implements WebMonitor {
                CheckpointStatsCache cache = new 
CheckpointStatsCache(maxCachedEntries);
 
                // Register the checkpoint stats handlers
-               router
-                       .GET("/jobs/:jobid/checkpoints", handler(new 
CheckpointStatsHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/checkpoints/config", handler(new 
CheckpointConfigHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/checkpoints/details/:checkpointid", 
handler(new CheckpointStatsDetailsHandler(currentGraphs, cache)))
-                       
.GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", 
handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
+               GET(router, new CheckpointStatsHandler(currentGraphs));
+               GET(router, new CheckpointConfigHandler(currentGraphs));
+               GET(router, new CheckpointStatsDetailsHandler(currentGraphs, 
cache));
+               GET(router, new 
CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
 
                if (webSubmitAllow) {
-                       router
-                               // fetch the list of uploaded jars.
-                               .GET("/jars", handler(new 
JarListHandler(uploadDir)))
+                       // fetch the list of uploaded jars.
+                       GET(router, new JarListHandler(uploadDir));
 
-                               // get plan for an uploaded jar
-                               .GET("/jars/:jarid/plan", handler(new 
JarPlanHandler(uploadDir)))
+                       // get plan for an uploaded jar
+                       GET(router, new JarPlanHandler(uploadDir));
 
-                               // run a jar
-                               .POST("/jars/:jarid/run", handler(new 
JarRunHandler(uploadDir, timeout, config)))
+                       // run a jar
+                       POST(router, new JarRunHandler(uploadDir, timeout, 
config));
 
-                               // upload a jar
-                               .POST("/jars/upload", handler(new 
JarUploadHandler(uploadDir)))
+                       // upload a jar
+                       POST(router, new JarUploadHandler(uploadDir));
 
-                               // delete an uploaded jar from submission 
interface
-                               .DELETE("/jars/:jarid", handler(new 
JarDeleteHandler(uploadDir)));
+                       // delete an uploaded jar from submission interface
+                       DELETE(router, new JarDeleteHandler(uploadDir));
                } else {
-                       router
-                               // send an Access Denied message (sort of)
-                               // Every other GET request will go to the File 
Server, which will not provide
-                               // access to the jar directory anyway, because 
it doesn't exist in webRootDir.
-                               .GET("/jars", handler(new 
JarAccessDeniedHandler()));
+                       // send an Access Denied message
+                       JarAccessDeniedHandler jad = new 
JarAccessDeniedHandler();
+                       GET(router, jad);
+                       POST(router, jad);
+                       DELETE(router, jad);
                }
 
                // this handler serves all the static contents
@@ -526,6 +515,40 @@ public class WebRuntimeMonitor implements WebMonitor {
                }
        }
 
+       /** These methods are used in the route path setup. They register the 
given {@link RequestHandler} or
+        * {@link RuntimeMonitorHandlerBase} with the given {@link Router} for 
the respective REST method.
+        * The REST paths under which they are registered are defined by the 
handlers. **/
+
+       private void GET(Router router, RequestHandler handler) {
+               GET(router, handler(handler));
+       }
+
+       private void GET(Router router, RuntimeMonitorHandlerBase handler) {
+               for (String path : handler.getPaths()) {
+                       router.GET(path, handler);
+               }
+       }
+
+       private void DELETE(Router router, RequestHandler handler) {
+               DELETE(router, handler(handler));
+       }
+
+       private void DELETE(Router router, RuntimeMonitorHandlerBase handler) {
+               for (String path : handler.getPaths()) {
+                       router.DELETE(path, handler);
+               }
+       }
+
+       private void POST(Router router, RequestHandler handler) {
+               POST(router, handler(handler));
+       }
+
+       private void POST(Router router, RuntimeMonitorHandlerBase handler) {
+               for (String path : handler.getPaths()) {
+                       router.POST(path, handler);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 99ef3d9..2bd055d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -38,6 +38,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
+       private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
        private static final String version = 
EnvironmentInformation.getVersion();
 
        private static final String commitID = 
EnvironmentInformation.getRevisionInformation().commitId;
@@ -49,6 +51,11 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                // we need no parameters, get all requests
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index b690c56..94b1c16 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -39,11 +39,18 @@ import static java.util.Objects.requireNonNull;
  */
 public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
+       private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
        private final FiniteDuration timeout;
        
        public CurrentJobIdsHandler(FiniteDuration timeout) {
                this.timeout = requireNonNull(timeout);
        }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CURRENT_JOB_IDS_REST_PATH};
+       }
        
        @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 07064da..8486a9c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -38,6 +38,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 
+       private static final String ALL_JOBS_REST_PATH = "/joboverview";
+       private static final String RUNNING_JOBS_REST_PATH = 
"/joboverview/running";
+       private static final String COMPLETED_JOBS_REST_PATH = 
"/joboverview/completed";
+
        private final FiniteDuration timeout;
        
        private final boolean includeRunningJobs;
@@ -55,6 +59,18 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               if (includeRunningJobs && includeFinishedJobs) {
+                       return new String[]{ALL_JOBS_REST_PATH};
+               }
+               if (includeRunningJobs) {
+                       return new String[]{RUNNING_JOBS_REST_PATH};
+               } else {
+                       return new String[]{COMPLETED_JOBS_REST_PATH};
+               }
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 6fe072b..49f4c26 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -32,6 +32,8 @@ import java.util.TimeZone;
  * and time zone of the server timestamps.
  */
 public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+
+       private static String DASHBOARD_CONFIG_REST_PATH = "/config";
        
        private final String configString;
        
@@ -65,6 +67,11 @@ public class DashboardConfigHandler extends 
AbstractJsonRequestHandler {
                        throw new RuntimeException(e.getMessage(), e);
                }
        }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{DASHBOARD_CONFIG_REST_PATH};
+       }
        
        @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index ba32d0d..f0e3faf 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -28,6 +28,17 @@ public class JarAccessDeniedHandler extends 
AbstractJsonRequestHandler {
                        "available for this cluster. To enable it, set the 
configuration key ' jobmanager.web.submit.enable.'\"}";
 
        @Override
+       public String[] getPaths() {
+               return new String[]{
+                       JarListHandler.JAR_LIST_REST_PATH,
+                       JarPlanHandler.JAR_PLAN_REST_PATH,
+                       JarRunHandler.JAR_RUN_REST_PATH,
+                       JarUploadHandler.JAR_UPLOAD_REST_PATH,
+                       JarDeleteHandler.JAR_DELETE_REST_PATH
+               };
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                return ERROR_MESSAGE;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index ae959a5..f3bf231 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -31,6 +31,8 @@ import java.util.Map;
  */
 public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
+       static final String JAR_DELETE_REST_PATH = "/jars/:jarid";
+
        private final File jarDir;
 
        public JarDeleteHandler(File jarDirectory) {
@@ -38,6 +40,11 @@ public class JarDeleteHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JAR_DELETE_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                final String file = pathParams.get("jarid");
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index f3cdc30..81b5e34 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -33,6 +33,8 @@ import java.util.jar.Manifest;
 
 public class JarListHandler extends AbstractJsonRequestHandler {
 
+       static final String JAR_LIST_REST_PATH = "/jars";
+
        private final File jarDir;
 
        public  JarListHandler(File jarDirectory) {
@@ -40,6 +42,11 @@ public class JarListHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JAR_LIST_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index bd0a6af..d121119 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -32,11 +32,18 @@ import java.util.Map;
  */
 public class JarPlanHandler extends JarActionHandler {
 
+       static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan";
+
        public JarPlanHandler(File jarDirectory) {
                super(jarDirectory);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JAR_PLAN_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 474be33..5f39e19 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -38,6 +38,8 @@ import java.util.Map;
  */
 public class JarRunHandler extends JarActionHandler {
 
+       static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
+
        private final FiniteDuration timeout;
        private final Configuration clientConfig;
 
@@ -48,6 +50,11 @@ public class JarRunHandler extends JarActionHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JAR_RUN_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 9a3b0e1..3d7cb8a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -29,6 +29,8 @@ import java.util.UUID;
  */
 public class JarUploadHandler extends AbstractJsonRequestHandler {
 
+       static final String JAR_UPLOAD_REST_PATH = "/jars/upload";
+
        private final File jarDir;
 
        public JarUploadHandler(File jarDir) {
@@ -36,6 +38,11 @@ public class JarUploadHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JAR_UPLOAD_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(
                                Map<String, String> pathParams,
                                Map<String, String> queryParams,

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 29613a0..7664153 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -30,12 +30,19 @@ import java.util.Map;
  * Request handler that returns the aggregated user accumulators of a job.
  */
 public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String JOB_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/accumulators";
        
        public JobAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 9f35719..d9de7d7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -30,6 +30,14 @@ import java.util.Map;
  */
 public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
+       private static final String JOB_CONCELLATION_REST_PATH = 
"/jobs/:jobid/cancel";
+       private static final String JOB_CONCELLATION_YARN_REST_PATH = 
"/jobs/:jobid/yarn-cancel";
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_CONCELLATION_REST_PATH, 
JOB_CONCELLATION_YARN_REST_PATH};
+       }
+
        @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 492ce76..b618d85 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -55,8 +55,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobCancellationWithSavepointHandlers {
 
+       private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint";
+       private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
+
        /** URL for in-progress cancellations. */
-       public static final String IN_PROGRESS_URL = 
"/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+       private static final String CANCELLATION_IN_PROGRESS_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
 
        /** Encodings for String. */
        private static final Charset ENCODING = Charset.forName("UTF-8");
@@ -127,6 +130,11 @@ public class JobCancellationWithSavepointHandlers {
                }
 
                @Override
+               public String[] getPaths() {
+                       return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, 
CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
+               }
+
+               @Override
                @SuppressWarnings("unchecked")
                public FullHttpResponse handleRequest(
                                Map<String, String> pathParams,
@@ -230,7 +238,7 @@ public class JobCancellationWithSavepointHandlers {
                        }
 
                        // In-progress location
-                       String location = IN_PROGRESS_URL
+                       String location = CANCELLATION_IN_PROGRESS_REST_PATH
                                        .replace(":jobid", jobId.toString())
                                        .replace(":requestId", 
Long.toString(requestId));
 
@@ -279,6 +287,11 @@ public class JobCancellationWithSavepointHandlers {
                private final ArrayDeque<Tuple2<Long, Object>> 
recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
 
                @Override
+               public String[] getPaths() {
+                       return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
+               }
+
+               @Override
                @SuppressWarnings("unchecked")
                public FullHttpResponse handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws 
Exception {
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 21639ef..459ca2a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -31,11 +31,18 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
  */
 public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
+       private static final String JOB_CONFIG_REST_PATH = 
"/jobs/:jobid/config";
+
        public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_CONFIG_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
 
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 35e6ca7..7780e66 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -47,6 +47,9 @@ import java.util.Map;
  */
 public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
+       private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
+       private static final String JOB_DETAILS_VERTICES_REST_PATH = 
"/jobs/:jobid/vertices";
+
        private final MetricFetcher fetcher;
 
        public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
MetricFetcher fetcher) {
@@ -55,6 +58,11 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_DETAILS_REST_PATH, 
JOB_DETAILS_VERTICES_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                final StringWriter writer = new StringWriter();
                final JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 376cca4..3720dac 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -33,6 +33,8 @@ import java.util.Map;
  */
 public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler 
{
 
+       private static final String JOB_EXCEPTIONS_REST_PATH = 
"/jobs/:jobid/exceptions";
+
        private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
        
        public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
@@ -40,6 +42,11 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_EXCEPTIONS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 11ca931..5fcf010 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -30,6 +30,8 @@ import java.util.Map;
  */
 public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
+       private static final String JOBMANAGER_CONFIG_REST_PATH = 
"/jobmanager/config";
+
        private final Configuration config;
 
        public JobManagerConfigHandler(Configuration config) {
@@ -37,6 +39,11 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 64f7000..becc2e1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -28,12 +28,18 @@ import java.util.Map;
  */
 public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
+       private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
        
        public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_PLAN_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                return graph.getJsonPlan();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 0f8c958..c8ec689 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -30,6 +30,14 @@ import java.util.Map;
  */
 public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
+       private static final String JOB_STOPPING_REST_PATH = 
"/jobs/:jobid/stop";
+       private static final String JOB_STOPPING_YARN_REST_PATH = 
"/jobs/:jobid/yarn-stop";
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_STOPPING_REST_PATH, 
JOB_STOPPING_YARN_REST_PATH};
+       }
+
        @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ad4e207..ccfcbba 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -29,12 +29,19 @@ import java.util.Map;
 
 
 public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/accumulators";
        
        public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index c5bacf2..52167e1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -39,6 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandler {
 
+       private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
+
        /** Back pressure stats tracker. */
        private final BackPressureStatsTracker backPressureStatsTracker;
 
@@ -57,6 +59,11 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(
                        AccessExecutionJobVertex accessJobVertex,
                        Map<String, String> params) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 32626ba..0a07896 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
  */
 public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
+       private static String JOB_VERTEX_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid";
+
        private final MetricFetcher fetcher;
 
        public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
@@ -47,6 +49,11 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_DETAILS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                final long now = System.currentTimeMillis();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index f468d35..b3dabea 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -43,6 +43,8 @@ import java.util.Map.Entry;
  */
 public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandler {
 
+       private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/taskmanagers";
+
        private final MetricFetcher fetcher;
 
        public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
@@ -51,6 +53,11 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index c56cfc3..b6246e6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -49,4 +49,11 @@ public interface RequestHandler {
         *         with the exception stack trace.
         */
        FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+
+       /**
+        * Returns an array of REST URL's under which this handler can be 
registered.
+        *
+        * @return array containing REST URL's under which this handler can be 
registered.
+        */
+       String[] getPaths();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 6d09513..4cf5f0f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -28,12 +28,19 @@ import java.util.Map;
  * Request handler providing details about a single task execution attempt.
  */
 public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttemptDetailsHandler {
+
+       public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
        
        public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
                super(executionGraphHolder, fetcher);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionVertex vertex, Map<String, 
String> params) throws Exception {
                return handleRequest(vertex.getCurrentExecutionAttempt(), 
params);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index e613efb..ba3a5ee 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -31,12 +31,19 @@ import java.util.Map;
  * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).  
  */
 public class SubtaskExecutionAttemptAccumulatorsHandler extends 
AbstractSubtaskAttemptRequestHandler {
+
+       private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
        
        public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index da8db02..b753b6e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -37,6 +37,8 @@ import java.util.Map;
  */
 public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptRequestHandler {
 
+       private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt";
+
        private final MetricFetcher fetcher;
 
        public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
@@ -45,6 +47,11 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
                final ExecutionState status = execAttempt.getState();
                final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 892a606..222d474 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -33,12 +33,19 @@ import java.util.Map;
  * Request handler that returns the accumulators for all subtasks of job 
vertex.
  */
 public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH =       
"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
        
        public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 76349ee..e2e35e3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -35,12 +35,18 @@ import java.util.Map;
  */
 public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
+       private static final String SUBTASK_TIMES_REST_PATH =   
"/jobs/:jobid/vertices/:vertexid/subtasktimes";
        
        public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_TIMES_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 6583d3b..1002bf3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -94,6 +94,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerLogHandler.class);
 
+       private static final String TASKMANAGER_LOG_REST_PATH = 
"/taskmanagers/:taskmanagerid/log";
+       private static final String TASKMANAGER_OUT_REST_PATH = 
"/taskmanagers/:taskmanagerid/stdout";
+
        /** Keep track of last transmitted log, to clean up old ones */
        private final HashMap<String, BlobKey> lastSubmittedLog = new 
HashMap<>();
        private final HashMap<String, BlobKey> lastSubmittedStdout = new 
HashMap<>();
@@ -141,6 +144,15 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                timeTimeout = Time.milliseconds(timeout.toMillis());
        }
 
+       @Override
+       public String[] getPaths() {
+               if (serveLogFile) {
+                       return new String[]{TASKMANAGER_LOG_REST_PATH};
+               } else {
+                       return new String[]{TASKMANAGER_OUT_REST_PATH};
+               }
+       }
+
        /**
         * Response when running with leading JobManager.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index c757f5c..a23e983 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -41,6 +41,9 @@ import static java.util.Objects.requireNonNull;
 
 public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
+       private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
+       private static final String TASKMANAGER_DETAILS_REST_PATH = 
"/taskmanagers/:taskmanagerid";
+
        public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
        
        private final FiniteDuration timeout;
@@ -53,6 +56,11 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{TASKMANAGERS_REST_PATH, 
TASKMANAGER_DETAILS_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index be0d283..de40a4a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -34,11 +34,18 @@ import java.util.Map;
  */
 public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandler {
 
+       private static final String CHECKPOINT_CONFIG_REST_PATH = 
"/jobs/:jobid/checkpoints/config";
+
        public CheckpointConfigHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_CONFIG_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index d461f03..e651824 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -38,6 +38,8 @@ import java.util.Map;
  */
 public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
 
+       private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = 
"/jobs/:jobid/checkpoints/details/:checkpointid";
+
        private final CheckpointStatsCache cache;
 
        public CheckpointStatsDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, CheckpointStatsCache cache) {
@@ -46,6 +48,11 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                long checkpointId = parseCheckpointId(params);
                if (checkpointId == -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d55467f..15dd911 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -44,6 +44,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGraphRequestHandler {
 
+       private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH 
= "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
        private final CheckpointStatsCache cache;
 
        public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder 
executionGraphHolder, CheckpointStatsCache cache) {
@@ -52,6 +54,11 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
        }
 
        @Override
+       public String[] getPaths() {
+               return new 
String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
+       }
+
+       @Override
        public String handleJsonRequest(
                Map<String, String> pathParams,
                Map<String, String> queryParams,

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 404b2c7..6413806 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -43,11 +43,18 @@ import java.util.Map;
  */
 public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler {
 
+       private static final String CHECKPOINT_STATS_REST_PATH = 
"/jobs/:jobid/checkpoints";
+
        public CheckpointStatsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_STATS_REST_PATH};
+       }
+
+       @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7452c71..f667ce5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -31,11 +31,19 @@ import java.util.Map;
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
 public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+
+       private static final String JOBMANAGER_METRICS_REST_PATH = 
"/jobmanager/metrics";
+
        public JobManagerMetricsHandler(MetricFetcher fetcher) {
                super(fetcher);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOBMANAGER_METRICS_REST_PATH};
+       }
+
+       @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobManagerMetricStore jobManager = 
metrics.getJobManagerMetricStore();
                if (jobManager == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index d66c954..26c9fa9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -32,12 +32,18 @@ import java.util.Map;
  */
 public class JobMetricsHandler extends AbstractMetricsHandler {
        public static final String PARAMETER_JOB_ID = "jobid";
+       private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
 
        public JobMetricsHandler(MetricFetcher fetcher) {
                super(fetcher);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_METRICS_REST_PATH};
+       }
+
+       @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
                return job != null

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 6fca771..3e838d7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -32,12 +32,18 @@ import java.util.Map;
  */
 public class JobVertexMetricsHandler extends AbstractMetricsHandler {
        public static final String PARAMETER_VERTEX_ID = "vertexid";
+       private static final String JOB_VERTEX_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/metrics";
 
        public JobVertexMetricsHandler(MetricFetcher fetcher) {
                super(fetcher);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_METRICS_REST_PATH};
+       }
+
+       @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
                        pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index f1b2e72..a74f5f2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -33,11 +33,19 @@ import java.util.Map;
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
 public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+
+       private static final String TASKMANAGER_METRICS_REST_PATH = 
"/taskmanagers/:taskmanagerid/metrics";
+
        public TaskManagerMetricsHandler(MetricFetcher fetcher) {
                super(fetcher);
        }
 
        @Override
+       public String[] getPaths() {
+               return new String[]{TASKMANAGER_METRICS_REST_PATH};
+       }
+
+       @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.TaskManagerMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
                if (taskManager == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
new file mode 100644
index 0000000..018ffdd
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class ClusterOverviewHandlerTest {
+       @Test
+       public void testGetPaths() {
+               ClusterOverviewHandler handler = new ClusterOverviewHandler(new 
FiniteDuration(0, TimeUnit.SECONDS));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/overview", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
new file mode 100644
index 0000000..e225648
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class CurrentJobIdsHandlerTest {
+       @Test
+       public void testGetPaths() {
+               CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new 
FiniteDuration(0, TimeUnit.SECONDS));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
new file mode 100644
index 0000000..3207fec
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class CurrentJobsOverviewHandlerTest {
+       @Test
+       public void testGetPaths() {
+               CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
+               String[] pathsAll = handlerAll.getPaths();
+               Assert.assertEquals(1, pathsAll.length);
+               Assert.assertEquals("/joboverview", pathsAll[0]);
+
+               CurrentJobsOverviewHandler handlerRunning = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, 
false);
+               String[] pathsRunning = handlerRunning.getPaths();
+               Assert.assertEquals(1, pathsRunning.length);
+               Assert.assertEquals("/joboverview/running", pathsRunning[0]);
+
+               CurrentJobsOverviewHandler handlerCompleted = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, 
true);
+               String[] pathsCompleted = handlerCompleted.getPaths();
+               Assert.assertEquals(1, pathsCompleted.length);
+               Assert.assertEquals("/joboverview/completed", 
pathsCompleted[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
new file mode 100644
index 0000000..aa2d552
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DashboardConfigHandlerTest {
+       @Test
+       public void testGetPaths() {
+               DashboardConfigHandler handler = new 
DashboardConfigHandler(10000L);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/config", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
new file mode 100644
index 0000000..e84926e
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class JarAccessDeniedHandlerTest {
+       @Test
+       public void testGetPaths() {
+               JarAccessDeniedHandler handler = new JarAccessDeniedHandler();
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(5, paths.length);
+               List<String> pathsList = Lists.newArrayList(paths);
+               Assert.assertTrue(pathsList.contains("/jars"));
+               Assert.assertTrue(pathsList.contains("/jars/upload"));
+               Assert.assertTrue(pathsList.contains("/jars/:jarid"));
+               Assert.assertTrue(pathsList.contains("/jars/:jarid/plan"));
+               Assert.assertTrue(pathsList.contains("/jars/:jarid/run"));
+       }
+}

Reply via email to