[FLINK-7531] Move Flink legacy rest handler to flink-runtime

Move metrics handlers under o.a.f.runtime.webmonitor.handlers

Move StaticFileServerHandler under o.a.f.runtime.webmonitor.files

This closes #4600.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc019a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc019a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc019a9

Branch: refs/heads/master
Commit: 4fc019a96a08446d7ba5f57664904abcd585e31c
Parents: 3277010
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Aug 18 09:52:30 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 20 00:40:24 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/BackPressureStatsTracker.java    | 334 --------------
 .../webmonitor/ExecutionGraphHolder.java        |  86 ----
 .../runtime/webmonitor/NotFoundException.java   |  32 --
 .../webmonitor/OperatorBackPressureStats.java   | 126 ------
 .../webmonitor/RuntimeMonitorHandler.java       |   4 +-
 .../runtime/webmonitor/StackTraceSample.java    | 119 -----
 .../webmonitor/StackTraceSampleCoordinator.java | 392 -----------------
 .../flink/runtime/webmonitor/WebHandler.java    |  32 --
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  82 ++--
 .../files/StaticFileServerHandler.java          | 363 ---------------
 .../AbstractExecutionGraphRequestHandler.java   |  80 ----
 .../AbstractJobVertexRequestHandler.java        |  72 ---
 .../handlers/AbstractJsonRequestHandler.java    |  87 ----
 .../AbstractSubtaskAttemptRequestHandler.java   |  78 ----
 .../handlers/AbstractSubtaskRequestHandler.java |  66 ---
 .../handlers/ClusterOverviewHandler.java        | 105 -----
 .../handlers/ConstantTextHandler.java           |  57 ---
 .../handlers/CurrentJobIdsHandler.java          | 112 -----
 .../handlers/CurrentJobsOverviewHandler.java    | 182 --------
 .../handlers/DashboardConfigHandler.java        |  90 ----
 .../handlers/JarAccessDeniedHandler.java        |   1 +
 .../webmonitor/handlers/JarActionHandler.java   |   2 +
 .../webmonitor/handlers/JarDeleteHandler.java   |   2 +
 .../webmonitor/handlers/JarListHandler.java     |   2 +
 .../webmonitor/handlers/JarPlanHandler.java     |   1 +
 .../webmonitor/handlers/JarRunHandler.java      |   1 +
 .../webmonitor/handlers/JarUploadHandler.java   |   1 +
 .../handlers/JobAccumulatorsHandler.java        | 107 -----
 .../handlers/JobCancellationHandler.java        |  72 ---
 .../JobCancellationWithSavepointHandlers.java   | 428 ------------------
 .../webmonitor/handlers/JobConfigHandler.java   | 119 -----
 .../webmonitor/handlers/JobDetailsHandler.java  | 225 ----------
 .../handlers/JobExceptionsHandler.java          | 137 ------
 .../handlers/JobManagerConfigHandler.java       |  87 ----
 .../webmonitor/handlers/JobPlanHandler.java     |  67 ---
 .../webmonitor/handlers/JobStoppingHandler.java |  72 ---
 .../handlers/JobVertexAccumulatorsHandler.java  | 113 -----
 .../handlers/JobVertexBackPressureHandler.java  | 147 -------
 .../handlers/JobVertexDetailsHandler.java       | 160 -------
 .../handlers/JobVertexTaskManagersHandler.java  | 211 ---------
 .../webmonitor/handlers/JsonFactory.java        |  35 --
 .../webmonitor/handlers/RequestHandler.java     |  56 ---
 .../handlers/RequestHandlerException.java       |  31 --
 .../SubtaskCurrentAttemptDetailsHandler.java    |  49 ---
 ...taskExecutionAttemptAccumulatorsHandler.java | 134 ------
 .../SubtaskExecutionAttemptDetailsHandler.java  | 167 -------
 .../SubtasksAllAccumulatorsHandler.java         | 131 ------
 .../handlers/SubtasksTimesHandler.java          | 141 ------
 .../handlers/TaskManagerLogHandler.java         | 335 --------------
 .../handlers/TaskManagersHandler.java           | 205 ---------
 .../checkpoints/CheckpointConfigHandler.java    | 120 -----
 .../checkpoints/CheckpointStatsCache.java       |  81 ----
 .../CheckpointStatsDetailsHandler.java          | 203 ---------
 .../CheckpointStatsDetailsSubtasksHandler.java  | 234 ----------
 .../checkpoints/CheckpointStatsHandler.java     | 277 ------------
 .../webmonitor/history/HistoryServer.java       |   2 +-
 .../history/HistoryServerArchiveFetcher.java    |   2 +-
 .../HistoryServerStaticFileServerHandler.java   |   2 +-
 .../metrics/AbstractMetricsHandler.java         | 139 ------
 .../metrics/JobManagerMetricsHandler.java       |  57 ---
 .../webmonitor/metrics/JobMetricsHandler.java   |  55 ---
 .../metrics/JobVertexMetricsHandler.java        |  57 ---
 .../webmonitor/metrics/MetricFetcher.java       | 211 ---------
 .../runtime/webmonitor/metrics/MetricStore.java | 305 -------------
 .../metrics/TaskManagerMetricsHandler.java      |  59 ---
 .../webmonitor/utils/MutableIOMetrics.java      | 109 -----
 .../BackPressureStatsTrackerITCase.java         | 332 --------------
 .../BackPressureStatsTrackerTest.java           | 192 --------
 .../StackTraceSampleCoordinatorITCase.java      | 203 ---------
 .../StackTraceSampleCoordinatorTest.java        | 441 -------------------
 .../runtime/webmonitor/files/MimeTypesTest.java |  75 ----
 .../handlers/ClusterOverviewHandlerTest.java    |  38 --
 .../handlers/CurrentJobIdsHandlerTest.java      |  38 --
 .../CurrentJobsOverviewHandlerTest.java         | 121 -----
 .../handlers/DashboardConfigHandlerTest.java    |  59 ---
 .../handlers/HandlerRedirectUtilsTest.java      |  74 ----
 .../handlers/JarActionHandlerTest.java          |  13 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |  83 ----
 .../handlers/JobCancellationHandlerTest.java    |  44 --
 ...obCancellationWithSavepointHandlersTest.java | 334 --------------
 .../handlers/JobConfigHandlerTest.java          |  92 ----
 .../handlers/JobDetailsHandlerTest.java         | 169 -------
 .../handlers/JobExceptionsHandlerTest.java      | 101 -----
 .../handlers/JobManagerConfigHandlerTest.java   |  37 --
 .../webmonitor/handlers/JobPlanHandlerTest.java |  60 ---
 .../handlers/JobStoppingHandlerTest.java        |  45 --
 .../JobVertexAccumulatorsHandlerTest.java       |  85 ----
 .../JobVertexBackPressureHandlerTest.java       | 211 ---------
 .../handlers/JobVertexDetailsHandlerTest.java   | 109 -----
 .../JobVertexTaskManagersHandlerTest.java       | 132 ------
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  40 --
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  91 ----
 ...btaskExecutionAttemptDetailsHandlerTest.java | 109 -----
 .../SubtasksAllAccumulatorsHandlerTest.java     |  97 ----
 .../handlers/SubtasksTimesHandlerTest.java      | 103 -----
 .../handlers/TaskManagerLogHandlerTest.java     | 149 -------
 .../handlers/TaskManagersHandlerTest.java       |  44 --
 .../CheckpointConfigHandlerTest.java            | 195 --------
 .../checkpoints/CheckpointStatsCacheTest.java   |  71 ---
 .../CheckpointStatsDetailsHandlerTest.java      | 358 ---------------
 .../checkpoints/CheckpointStatsHandlerTest.java | 432 ------------------
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ----------------
 .../webmonitor/history/FsJobArchivistTest.java  |   2 +-
 .../webmonitor/history/HistoryServerTest.java   |   2 +-
 .../metrics/AbstractMetricsHandlerTest.java     | 172 --------
 .../metrics/JobManagerMetricsHandlerTest.java   |  84 ----
 .../metrics/JobMetricsHandlerTest.java          |  86 ----
 .../metrics/JobVertexMetricsHandlerTest.java    |  90 ----
 .../webmonitor/metrics/MetricFetcherTest.java   | 195 --------
 .../webmonitor/metrics/MetricStoreTest.java     |  88 ----
 .../metrics/TaskManagerMetricsHandlerTest.java  |  86 ----
 .../utils/ArchivedExecutionBuilder.java         | 150 -------
 .../utils/ArchivedExecutionConfigBuilder.java   |  71 ---
 .../utils/ArchivedExecutionGraphBuilder.java    | 140 ------
 .../ArchivedExecutionJobVertexBuilder.java      |  84 ----
 .../utils/ArchivedExecutionVertexBuilder.java   |  73 ---
 .../utils/ArchivedJobGenerationUtils.java       | 164 -------
 .../flink/runtime/rest/NotFoundException.java   |  32 ++
 .../flink/runtime/rest/handler/WebHandler.java  |  32 ++
 .../AbstractExecutionGraphRequestHandler.java   |  79 ++++
 .../legacy/AbstractJobVertexRequestHandler.java |  71 +++
 .../legacy/AbstractJsonRequestHandler.java      |  88 ++++
 .../AbstractSubtaskAttemptRequestHandler.java   |  77 ++++
 .../legacy/AbstractSubtaskRequestHandler.java   |  65 +++
 .../handler/legacy/ClusterOverviewHandler.java  | 105 +++++
 .../handler/legacy/ConstantTextHandler.java     |  57 +++
 .../handler/legacy/CurrentJobIdsHandler.java    | 112 +++++
 .../legacy/CurrentJobsOverviewHandler.java      | 182 ++++++++
 .../handler/legacy/DashboardConfigHandler.java  |  90 ++++
 .../handler/legacy/ExecutionGraphHolder.java    |  82 ++++
 .../handler/legacy/JobAccumulatorsHandler.java  | 106 +++++
 .../handler/legacy/JobCancellationHandler.java  |  72 +++
 .../JobCancellationWithSavepointHandlers.java   | 427 ++++++++++++++++++
 .../rest/handler/legacy/JobConfigHandler.java   | 118 +++++
 .../rest/handler/legacy/JobDetailsHandler.java  | 224 ++++++++++
 .../handler/legacy/JobExceptionsHandler.java    | 136 ++++++
 .../handler/legacy/JobManagerConfigHandler.java |  87 ++++
 .../rest/handler/legacy/JobPlanHandler.java     |  66 +++
 .../rest/handler/legacy/JobStoppingHandler.java |  72 +++
 .../legacy/JobVertexAccumulatorsHandler.java    | 112 +++++
 .../legacy/JobVertexBackPressureHandler.java    | 145 ++++++
 .../handler/legacy/JobVertexDetailsHandler.java | 159 +++++++
 .../legacy/JobVertexTaskManagersHandler.java    | 210 +++++++++
 .../rest/handler/legacy/JsonFactory.java        |  35 ++
 .../rest/handler/legacy/RequestHandler.java     |  56 +++
 .../handler/legacy/RequestHandlerException.java |  31 ++
 .../SubtaskCurrentAttemptDetailsHandler.java    |  48 ++
 ...taskExecutionAttemptAccumulatorsHandler.java | 133 ++++++
 .../SubtaskExecutionAttemptDetailsHandler.java  | 166 +++++++
 .../legacy/SubtasksAllAccumulatorsHandler.java  | 130 ++++++
 .../handler/legacy/SubtasksTimesHandler.java    | 140 ++++++
 .../handler/legacy/TaskManagerLogHandler.java   | 335 ++++++++++++++
 .../handler/legacy/TaskManagersHandler.java     | 205 +++++++++
 .../backpressure/BackPressureStatsTracker.java  | 333 ++++++++++++++
 .../backpressure/OperatorBackPressureStats.java | 126 ++++++
 .../legacy/backpressure/StackTraceSample.java   | 119 +++++
 .../StackTraceSampleCoordinator.java            | 392 +++++++++++++++++
 .../checkpoints/CheckpointConfigHandler.java    | 120 +++++
 .../checkpoints/CheckpointStatsCache.java       |  81 ++++
 .../CheckpointStatsDetailsHandler.java          | 203 +++++++++
 .../CheckpointStatsDetailsSubtasksHandler.java  | 233 ++++++++++
 .../checkpoints/CheckpointStatsHandler.java     | 277 ++++++++++++
 .../legacy/files/StaticFileServerHandler.java   | 363 +++++++++++++++
 .../legacy/metrics/AbstractMetricsHandler.java  | 139 ++++++
 .../metrics/JobManagerMetricsHandler.java       |  57 +++
 .../legacy/metrics/JobMetricsHandler.java       |  55 +++
 .../legacy/metrics/JobVertexMetricsHandler.java |  57 +++
 .../handler/legacy/metrics/MetricFetcher.java   | 211 +++++++++
 .../handler/legacy/metrics/MetricStore.java     | 305 +++++++++++++
 .../metrics/TaskManagerMetricsHandler.java      |  59 +++
 .../rest/handler/util/MutableIOMetrics.java     | 109 +++++
 .../legacy/ClusterOverviewHandlerTest.java      |  38 ++
 .../legacy/CurrentJobIdsHandlerTest.java        |  38 ++
 .../legacy/CurrentJobsOverviewHandlerTest.java  | 121 +++++
 .../legacy/DashboardConfigHandlerTest.java      |  59 +++
 .../legacy/HandlerRedirectUtilsTest.java        |  74 ++++
 .../legacy/JobAccumulatorsHandlerTest.java      |  82 ++++
 .../legacy/JobCancellationHandlerTest.java      |  44 ++
 ...obCancellationWithSavepointHandlersTest.java | 333 ++++++++++++++
 .../handler/legacy/JobConfigHandlerTest.java    |  91 ++++
 .../handler/legacy/JobDetailsHandlerTest.java   | 168 +++++++
 .../legacy/JobExceptionsHandlerTest.java        | 100 +++++
 .../legacy/JobManagerConfigHandlerTest.java     |  37 ++
 .../rest/handler/legacy/JobPlanHandlerTest.java |  59 +++
 .../handler/legacy/JobStoppingHandlerTest.java  |  45 ++
 .../JobVertexAccumulatorsHandlerTest.java       |  84 ++++
 .../JobVertexBackPressureHandlerTest.java       | 209 +++++++++
 .../legacy/JobVertexDetailsHandlerTest.java     | 108 +++++
 .../JobVertexTaskManagersHandlerTest.java       | 132 ++++++
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  40 ++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  91 ++++
 ...btaskExecutionAttemptDetailsHandlerTest.java | 109 +++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  97 ++++
 .../legacy/SubtasksTimesHandlerTest.java        | 103 +++++
 .../legacy/TaskManagerLogHandlerTest.java       | 149 +++++++
 .../handler/legacy/TaskManagersHandlerTest.java |  44 ++
 .../BackPressureStatsTrackerITCase.java         | 329 ++++++++++++++
 .../BackPressureStatsTrackerTest.java           | 185 ++++++++
 .../StackTraceSampleCoordinatorITCase.java      | 203 +++++++++
 .../StackTraceSampleCoordinatorTest.java        | 432 ++++++++++++++++++
 .../CheckpointConfigHandlerTest.java            | 195 ++++++++
 .../checkpoints/CheckpointStatsCacheTest.java   |  71 +++
 .../CheckpointStatsDetailsHandlerTest.java      | 358 +++++++++++++++
 .../checkpoints/CheckpointStatsHandlerTest.java | 432 ++++++++++++++++++
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ++++++++++++++++
 .../handler/legacy/files/MimeTypesTest.java     |  75 ++++
 .../metrics/AbstractMetricsHandlerTest.java     | 172 ++++++++
 .../metrics/JobManagerMetricsHandlerTest.java   |  84 ++++
 .../legacy/metrics/JobMetricsHandlerTest.java   |  86 ++++
 .../metrics/JobVertexMetricsHandlerTest.java    |  90 ++++
 .../legacy/metrics/MetricFetcherTest.java       | 195 ++++++++
 .../handler/legacy/metrics/MetricStoreTest.java |  88 ++++
 .../metrics/TaskManagerMetricsHandlerTest.java  |  86 ++++
 .../legacy/utils/ArchivedExecutionBuilder.java  | 150 +++++++
 .../utils/ArchivedExecutionConfigBuilder.java   |  71 +++
 .../utils/ArchivedExecutionGraphBuilder.java    | 140 ++++++
 .../ArchivedExecutionJobVertexBuilder.java      |  84 ++++
 .../utils/ArchivedExecutionVertexBuilder.java   |  73 +++
 .../utils/ArchivedJobGenerationUtils.java       | 164 +++++++
 219 files changed, 14198 insertions(+), 14237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
deleted file mode 100644
index 5e4e63a..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
-import scala.Option;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Back pressure statistics tracker.
- *
- * <p>Back pressure is determined by sampling running tasks. If a task is
- * slowed down by back pressure it will be stuck in memory requests to a
- * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
- *
- * <p>The back pressured stack traces look like this:
- *
- * <pre>
- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
<--- BLOCKING
- * request
- * [...]
- * </pre>
- */
-public class BackPressureStatsTracker {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureStatsTracker.class);
-
-       /** Maximum stack trace depth for samples. */
-       static final int MAX_STACK_TRACE_DEPTH = 3;
-
-       /** Expected class name for back pressure indicating stack trace 
element. */
-       static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
-
-       /** Expected method name for back pressure indicating stack trace 
element. */
-       static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
-
-       /** Lock guarding trigger operations. */
-       private final Object lock = new Object();
-
-       /* Stack trace sample coordinator. */
-       private final StackTraceSampleCoordinator coordinator;
-
-       /**
-        * Completed stats. Important: Job vertex IDs need to be scoped by job 
ID,
-        * because they are potentially constant across runs messing up the 
cached
-        * data.
-        */
-       private final Cache<ExecutionJobVertex, OperatorBackPressureStats> 
operatorStatsCache;
-
-       /** Pending in progress stats. Important: Job vertex IDs need to be 
scoped
-        * by job ID, because they are potentially constant across runs messing 
up
-        * the cached data.*/
-       private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
-
-       /** Cleanup interval for completed stats cache. */
-       private final int cleanUpInterval;
-
-       private final int numSamples;
-
-       private final Time delayBetweenSamples;
-
-       /** Flag indicating whether the stats tracker has been shut down. */
-       private boolean shutDown;
-
-       /**
-        * Creates a back pressure statistics tracker.
-        *
-        * @param cleanUpInterval     Clean up interval for completed stats.
-        * @param numSamples          Number of stack trace samples when 
determining back pressure.
-        * @param delayBetweenSamples Delay between samples when determining 
back pressure.
-        */
-       public BackPressureStatsTracker(
-                       StackTraceSampleCoordinator coordinator,
-                       int cleanUpInterval,
-                       int numSamples,
-                       Time delayBetweenSamples) {
-
-               this.coordinator = checkNotNull(coordinator, "Stack trace 
sample coordinator");
-
-               checkArgument(cleanUpInterval >= 0, "Clean up interval");
-               this.cleanUpInterval = cleanUpInterval;
-
-               checkArgument(numSamples >= 1, "Number of samples");
-               this.numSamples = numSamples;
-
-               this.delayBetweenSamples = checkNotNull(delayBetweenSamples, 
"Delay between samples");
-
-               this.operatorStatsCache = CacheBuilder.newBuilder()
-                               .concurrencyLevel(1)
-                               .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
-                               .build();
-       }
-
-       /** Cleanup interval for completed stats cache. */
-       public long getCleanUpInterval() {
-               return cleanUpInterval;
-       }
-
-       /**
-        * Returns back pressure statistics for a operator.
-        *
-        * @param vertex Operator to get the stats for.
-        *
-        * @return Back pressure statistics for an operator
-        */
-       public Option<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
-               return Option.apply(operatorStatsCache.getIfPresent(vertex));
-       }
-
-       /**
-        * Triggers a stack trace sample for a operator to gather the back 
pressure
-        * statistics. If there is a sample in progress for the operator, the 
call
-        * is ignored.
-        *
-        * @param vertex Operator to get the stats for.
-        * @return Flag indicating whether a sample with triggered.
-        */
-       @SuppressWarnings("unchecked")
-       public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
-               synchronized (lock) {
-                       if (shutDown) {
-                               return false;
-                       }
-
-                       if (!pendingStats.contains(vertex) &&
-                                       
!vertex.getGraph().getState().isGloballyTerminalState()) {
-
-                               Executor executor = 
vertex.getGraph().getFutureExecutor();
-
-                               // Only trigger if still active job
-                               if (executor != null) {
-                                       pendingStats.add(vertex);
-
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
-                                       }
-
-                                       CompletableFuture<StackTraceSample> 
sample = coordinator.triggerStackTraceSample(
-                                                       
vertex.getTaskVertices(),
-                                                       numSamples,
-                                                       delayBetweenSamples,
-                                                       MAX_STACK_TRACE_DEPTH);
-
-                                       sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
-
-                                       return true;
-                               }
-                       }
-
-                       return false;
-               }
-       }
-
-       /**
-        * Cleans up the operator stats cache if it contains timed out entries.
-        *
-        * <p>The Guava cache only evicts as maintenance during normal 
operations.
-        * If this handler is inactive, it will never be cleaned.
-        */
-       public void cleanUpOperatorStatsCache() {
-               operatorStatsCache.cleanUp();
-       }
-
-       /**
-        * Shuts down the stats tracker.
-        *
-        * <p>Invalidates the cache and clears all pending stats.
-        */
-       public void shutDown() {
-               synchronized (lock) {
-                       if (!shutDown) {
-                               operatorStatsCache.invalidateAll();
-                               pendingStats.clear();
-
-                               shutDown = true;
-                       }
-               }
-       }
-
-       /**
-        * Invalidates the cache (irrespective of clean up interval).
-        */
-       void invalidateOperatorStatsCache() {
-               operatorStatsCache.invalidateAll();
-       }
-
-       /**
-        * Callback on completed stack trace sample.
-        */
-       class StackTraceSampleCompletionCallback implements 
BiFunction<StackTraceSample, Throwable, Void> {
-
-               private final ExecutionJobVertex vertex;
-
-               public StackTraceSampleCompletionCallback(ExecutionJobVertex 
vertex) {
-                       this.vertex = vertex;
-               }
-
-               @Override
-               public Void apply(StackTraceSample stackTraceSample, Throwable 
throwable) {
-                       synchronized (lock) {
-                               try {
-                                       if (shutDown) {
-                                               return null;
-                                       }
-
-                                       // Job finished, ignore.
-                                       JobStatus jobState = 
vertex.getGraph().getState();
-                                       if (jobState.isGloballyTerminalState()) 
{
-                                               LOG.debug("Ignoring sample, 
because job is in state " + jobState + ".");
-                                       } else if (stackTraceSample != null) {
-                                               OperatorBackPressureStats stats 
= createStatsFromSample(stackTraceSample);
-                                               operatorStatsCache.put(vertex, 
stats);
-                                       } else {
-                                               LOG.debug("Failed to gather 
stack trace sample.", throwable);
-                                       }
-                               } catch (Throwable t) {
-                                       LOG.error("Error during stats 
completion.", t);
-                               } finally {
-                                       pendingStats.remove(vertex);
-                               }
-
-                               return null;
-                       }
-               }
-
-               /**
-                * Creates the back pressure stats from a stack trace sample.
-                *
-                * @param sample Stack trace sample to base stats on.
-                *
-                * @return Back pressure stats
-                */
-               private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample) {
-                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
traces = sample.getStackTraces();
-
-                       // Map task ID to subtask index, because the web 
interface expects
-                       // it like that.
-                       Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
-                                       
.newHashMapWithExpectedSize(traces.size());
-
-                       Set<ExecutionAttemptID> sampledTasks = 
sample.getStackTraces().keySet();
-
-                       for (ExecutionVertex task : vertex.getTaskVertices()) {
-                               ExecutionAttemptID taskId = 
task.getCurrentExecutionAttempt().getAttemptId();
-                               if (sampledTasks.contains(taskId)) {
-                                       subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
-                               } else {
-                                       LOG.debug("Outdated sample. A task, 
which is part of the " +
-                                                       "sample has been 
reset.");
-                               }
-                       }
-
-                       // Ratio of blocked samples to total samples per sub 
task. Array
-                       // position corresponds to sub task index.
-                       double[] backPressureRatio = new double[traces.size()];
-
-                       for (Entry<ExecutionAttemptID, 
List<StackTraceElement[]>> entry : traces.entrySet()) {
-                               int backPressureSamples = 0;
-
-                               List<StackTraceElement[]> taskTraces = 
entry.getValue();
-
-                               for (StackTraceElement[] trace : taskTraces) {
-                                       for (int i = trace.length - 1; i >= 0; 
i--) {
-                                               StackTraceElement elem = 
trace[i];
-
-                                               if 
(elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
-                                                               
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
-
-                                                       backPressureSamples++;
-                                                       break; // Continue with 
next stack trace
-                                               }
-                                       }
-                               }
-
-                               int subtaskIndex = 
subtaskIndexMap.get(entry.getKey());
-
-                               int size = taskTraces.size();
-                               double ratio = (size > 0)
-                                               ? ((double) 
backPressureSamples) / size
-                                               : 0;
-
-                               backPressureRatio[subtaskIndex] = ratio;
-                       }
-
-                       return new OperatorBackPressureStats(
-                                       sample.getSampleId(),
-                                       sample.getEndTime(),
-                                       backPressureRatio);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
deleted file mode 100644
index 8a96969..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.WeakHashMap;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like 
JobManager or Archive.
- *
- * <p>The holder will cache the ExecutionGraph behind a weak reference, which 
will be cleared
- * at some point once no one else is pointing to the ExecutionGraph.
- * Note that while the holder runs in the same JVM as the JobManager or 
Archive, the reference should
- * stay valid.
- */
-public class ExecutionGraphHolder {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionGraphHolder.class);
-
-       private final Time timeout;
-
-       private final WeakHashMap<JobID, AccessExecutionGraph> cache = new 
WeakHashMap<>();
-
-       public ExecutionGraphHolder() {
-               this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
-       }
-
-       public ExecutionGraphHolder(Time timeout) {
-               this.timeout = checkNotNull(timeout);
-       }
-
-       /**
-        * Retrieves the execution graph with {@link JobID} jid wrapped in 
{@link Optional} or
-        * {@link Optional#empty()} if it cannot be found.
-        *
-        * @param jid jobID of the execution graph to be retrieved
-        * @return Optional ExecutionGraph if it has been retrievable, empty if 
there has been no ExecutionGraph
-        */
-       public CompletableFuture<Optional<AccessExecutionGraph>> 
getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
-               AccessExecutionGraph cached = cache.get(jid);
-               if (cached != null) {
-                       if (cached.getState() == JobStatus.SUSPENDED) {
-                               cache.remove(jid);
-                       } else {
-                               return 
CompletableFuture.completedFuture(Optional.of(cached));
-                       }
-               }
-
-               CompletableFuture<Optional<AccessExecutionGraph>> 
executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
-
-               executionGraphFuture.thenAcceptAsync(
-                       optExecutionGraph ->
-                               optExecutionGraph.ifPresent(executionGraph -> 
cache.put(jid, executionGraph)));
-
-               return executionGraphFuture;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
deleted file mode 100644
index 71125c9..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-/**
- * A special exception that indicates that an element was not found and that 
the
- * request should be answered with a {@code 404} return code.
- */
-public class NotFoundException extends Exception {
-
-       private static final long serialVersionUID = -4036006746423754639L;
-
-       public NotFoundException(String message) {
-               super(message);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
deleted file mode 100644
index bfd5be2..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Back pressure statistics of multiple tasks.
- *
- * <p>Statistics are gathered by sampling stack traces of running tasks. The
- * back pressure ratio denotes the ratio of traces indicating back pressure
- * to the total number of sampled traces.
- */
-public class OperatorBackPressureStats {
-
-       /** ID of the corresponding sample. */
-       private final int sampleId;
-
-       /** End time stamp of the corresponding sample. */
-       private final long endTimestamp;
-
-       /** Back pressure ratio per subtask. */
-       private final double[] subTaskBackPressureRatio;
-
-       /** Maximum back pressure ratio. */
-       private final double maxSubTaskBackPressureRatio;
-
-       public OperatorBackPressureStats(
-                       int sampleId,
-                       long endTimestamp,
-                       double[] subTaskBackPressureRatio) {
-
-               this.sampleId = sampleId;
-               this.endTimestamp = endTimestamp;
-               this.subTaskBackPressureRatio = 
checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
-               checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub 
task back pressure ratio specified");
-
-               double max = 0;
-               for (double ratio : subTaskBackPressureRatio) {
-                       if (ratio > max) {
-                               max = ratio;
-                       }
-               }
-
-               maxSubTaskBackPressureRatio = max;
-       }
-
-       /**
-        * Returns the ID of the sample.
-        *
-        * @return ID of the sample
-        */
-       public int getSampleId() {
-               return sampleId;
-       }
-
-       /**
-        * Returns the time stamp, when all stack traces were collected at the
-        * JobManager.
-        *
-        * @return Time stamp, when all stack traces were collected at the
-        * JobManager
-        */
-       public long getEndTimestamp() {
-               return endTimestamp;
-       }
-
-       /**
-        * Returns the number of sub tasks.
-        *
-        * @return Number of sub tasks.
-        */
-       public int getNumberOfSubTasks() {
-               return subTaskBackPressureRatio.length;
-       }
-
-       /**
-        * Returns the ratio of stack traces indicating back pressure to total
-        * number of sampled stack traces.
-        *
-        * @param index Subtask index.
-        *
-        * @return Ratio of stack traces indicating back pressure to total 
number
-        * of sampled stack traces.
-        */
-       public double getBackPressureRatio(int index) {
-               return subTaskBackPressureRatio[index];
-       }
-
-       /**
-        * Returns the maximum back pressure ratio of all sub tasks.
-        *
-        * @return Maximum back pressure ratio of all sub tasks.
-        */
-       public double getMaxBackPressureRatio() {
-               return maxSubTaskBackPressureRatio;
-       }
-
-       @Override
-       public String toString() {
-               return "OperatorBackPressureStats{" +
-                               "sampleId=" + sampleId +
-                               ", endTimestamp=" + endTimestamp +
-                               ", subTaskBackPressureRatio=" + 
Arrays.toString(subTaskBackPressureRatio) +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index b393021..993a225 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
deleted file mode 100644
index d60f8a4..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * A sample of stack traces for one or more tasks.
- *
- * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
- */
-public class StackTraceSample {
-
-       /** ID of this sample (unique per job). */
-       private final int sampleId;
-
-       /** Time stamp, when the sample was triggered. */
-       private final long startTime;
-
-       /** Time stamp, when all stack traces were collected at the JobManager. 
*/
-       private final long endTime;
-
-       /** Map of stack traces by execution ID. */
-       private final Map<ExecutionAttemptID, List<StackTraceElement[]>> 
stackTracesByTask;
-
-       /**
-        * Creates a stack trace sample.
-        *
-        * @param sampleId          ID of the sample.
-        * @param startTime         Time stamp, when the sample was triggered.
-        * @param endTime           Time stamp, when all stack traces were
-        *                          collected at the JobManager.
-        * @param stackTracesByTask Map of stack traces by execution ID.
-        */
-       public StackTraceSample(
-                       int sampleId,
-                       long startTime,
-                       long endTime,
-                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
stackTracesByTask) {
-
-               checkArgument(sampleId >= 0, "Negative sample ID");
-               checkArgument(startTime >= 0, "Negative start time");
-               checkArgument(endTime >= startTime, "End time before start 
time");
-
-               this.sampleId = sampleId;
-               this.startTime = startTime;
-               this.endTime = endTime;
-               this.stackTracesByTask = 
Collections.unmodifiableMap(stackTracesByTask);
-       }
-
-       /**
-        * Returns the ID of the sample.
-        *
-        * @return ID of the sample
-        */
-       public int getSampleId() {
-               return sampleId;
-       }
-
-       /**
-        * Returns the time stamp, when the sample was triggered.
-        *
-        * @return Time stamp, when the sample was triggered
-        */
-       public long getStartTime() {
-               return startTime;
-       }
-
-       /**
-        * Returns the time stamp, when all stack traces were collected at the
-        * JobManager.
-        *
-        * @return Time stamp, when all stack traces were collected at the
-        * JobManager
-        */
-       public long getEndTime() {
-               return endTime;
-       }
-
-       /**
-        * Returns the a map of stack traces by execution ID.
-        *
-        * @return Map of stack traces by execution ID
-        */
-       public Map<ExecutionAttemptID, List<StackTraceElement[]>> 
getStackTraces() {
-               return stackTracesByTask;
-       }
-
-       @Override
-       public String toString() {
-               return "StackTraceSample{" +
-                               "sampleId=" + sampleId +
-                               ", startTime=" + startTime +
-                               ", endTime=" + endTime +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
deleted file mode 100644
index 534d2fa..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A coordinator for triggering and collecting stack traces of running tasks.
- */
-public class StackTraceSampleCoordinator {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
-
-       private static final int NUM_GHOST_SAMPLE_IDS = 10;
-
-       private final Object lock = new Object();
-
-       /** Executor used to run the futures. */
-       private final Executor executor;
-
-       /** Time out after the expected sampling duration. */
-       private final long sampleTimeout;
-
-       /** In progress samples (guarded by lock). */
-       private final Map<Integer, PendingStackTraceSample> pendingSamples = 
new HashMap<>();
-
-       /** A list of recent sample IDs to identify late messages vs. invalid 
ones. */
-       private final ArrayDeque<Integer> recentPendingSamples = new 
ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
-
-       /** Sample ID counter (guarded by lock). */
-       private int sampleIdCounter;
-
-       /**
-        * Flag indicating whether the coordinator is still running (guarded by
-        * lock).
-        */
-       private boolean isShutDown;
-
-       /**
-        * Creates a new coordinator for the job.
-        *
-        * @param executor to use to execute the futures
-        * @param sampleTimeout Time out after the expected sampling duration.
-        *                      This is added to the expected duration of a
-        *                      sample, which is determined by the number of
-        *                      samples and the delay between each sample.
-        */
-       public StackTraceSampleCoordinator(Executor executor, long 
sampleTimeout) {
-               checkArgument(sampleTimeout >= 0L);
-               this.executor = Preconditions.checkNotNull(executor);
-               this.sampleTimeout = sampleTimeout;
-       }
-
-       /**
-        * Triggers a stack trace sample to all tasks.
-        *
-        * @param tasksToSample       Tasks to sample.
-        * @param numSamples          Number of stack trace samples to collect.
-        * @param delayBetweenSamples Delay between consecutive samples.
-        * @param maxStackTraceDepth  Maximum depth of the stack trace. 0 
indicates
-        *                            no maximum and keeps the complete stack 
trace.
-        * @return A future of the completed stack trace sample
-        */
-       @SuppressWarnings("unchecked")
-       public CompletableFuture<StackTraceSample> triggerStackTraceSample(
-                       ExecutionVertex[] tasksToSample,
-                       int numSamples,
-                       Time delayBetweenSamples,
-                       int maxStackTraceDepth) {
-
-               checkNotNull(tasksToSample, "Tasks to sample");
-               checkArgument(tasksToSample.length >= 1, "No tasks to sample");
-               checkArgument(numSamples >= 1, "No number of samples");
-               checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack 
trace depth");
-
-               // Execution IDs of running tasks
-               ExecutionAttemptID[] triggerIds = new 
ExecutionAttemptID[tasksToSample.length];
-               Execution[] executions = new Execution[tasksToSample.length];
-
-               // Check that all tasks are RUNNING before triggering anything. 
The
-               // triggering can still fail.
-               for (int i = 0; i < triggerIds.length; i++) {
-                       Execution execution = 
tasksToSample[i].getCurrentExecutionAttempt();
-                       if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
-                               executions[i] = execution;
-                               triggerIds[i] = execution.getAttemptId();
-                       } else {
-                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Task " + tasksToSample[i]
-                                       .getTaskNameWithSubtaskIndex() + " is 
not running."));
-                       }
-               }
-
-               synchronized (lock) {
-                       if (isShutDown) {
-                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
-                       }
-
-                       final int sampleId = sampleIdCounter++;
-
-                       LOG.debug("Triggering stack trace sample {}", sampleId);
-
-                       final PendingStackTraceSample pending = new 
PendingStackTraceSample(
-                                       sampleId, triggerIds);
-
-                       // Discard the sample if it takes too long. We don't 
send cancel
-                       // messages to the task managers, but only wait for the 
responses
-                       // and then ignore them.
-                       long expectedDuration = numSamples * 
delayBetweenSamples.toMilliseconds();
-                       Time timeout = Time.milliseconds(expectedDuration + 
sampleTimeout);
-
-                       // Add the pending sample before scheduling the discard 
task to
-                       // prevent races with removing it again.
-                       pendingSamples.put(sampleId, pending);
-
-                       // Trigger all samples
-                       for (Execution execution: executions) {
-                               final 
CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = 
execution.requestStackTraceSample(
-                                       sampleId,
-                                       numSamples,
-                                       delayBetweenSamples,
-                                       maxStackTraceDepth,
-                                       timeout);
-
-                               stackTraceSampleFuture.handleAsync(
-                                       (StackTraceSampleResponse 
stackTraceSampleResponse, Throwable throwable) -> {
-                                               if (stackTraceSampleResponse != 
null) {
-                                                       collectStackTraces(
-                                                               
stackTraceSampleResponse.getSampleId(),
-                                                               
stackTraceSampleResponse.getExecutionAttemptID(),
-                                                               
stackTraceSampleResponse.getSamples());
-                                               } else {
-                                                       
cancelStackTraceSample(sampleId, throwable);
-                                               }
-
-                                               return null;
-                                       },
-                                       executor);
-                       }
-
-                       return pending.getStackTraceSampleFuture();
-               }
-       }
-
-       /**
-        * Cancels a pending sample.
-        *
-        * @param sampleId ID of the sample to cancel.
-        * @param cause Cause of the cancelling (can be <code>null</code>).
-        */
-       public void cancelStackTraceSample(int sampleId, Throwable cause) {
-               synchronized (lock) {
-                       if (isShutDown) {
-                               return;
-                       }
-
-                       PendingStackTraceSample sample = 
pendingSamples.remove(sampleId);
-                       if (sample != null) {
-                               if (cause != null) {
-                                       LOG.info("Cancelling sample " + 
sampleId, cause);
-                               } else {
-                                       LOG.info("Cancelling sample {}", 
sampleId);
-                               }
-
-                               sample.discard(cause);
-                               rememberRecentSampleId(sampleId);
-                       }
-               }
-       }
-
-       /**
-        * Shuts down the coordinator.
-        *
-        * <p>After shut down, no further operations are executed.
-        */
-       public void shutDown() {
-               synchronized (lock) {
-                       if (!isShutDown) {
-                               LOG.info("Shutting down stack trace sample 
coordinator.");
-
-                               for (PendingStackTraceSample pending : 
pendingSamples.values()) {
-                                       pending.discard(new 
RuntimeException("Shut down"));
-                               }
-
-                               pendingSamples.clear();
-
-                               isShutDown = true;
-                       }
-               }
-       }
-
-       /**
-        * Collects stack traces of a task.
-        *
-        * @param sampleId    ID of the sample.
-        * @param executionId ID of the sampled task.
-        * @param stackTraces Stack traces of the sampled task.
-        *
-        * @throws IllegalStateException If unknown sample ID and not recently
-        *                               finished or cancelled sample.
-        */
-       public void collectStackTraces(
-                       int sampleId,
-                       ExecutionAttemptID executionId,
-                       List<StackTraceElement[]> stackTraces) {
-
-               synchronized (lock) {
-                       if (isShutDown) {
-                               return;
-                       }
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Collecting stack trace sample {} of 
task {}", sampleId, executionId);
-                       }
-
-                       PendingStackTraceSample pending = 
pendingSamples.get(sampleId);
-
-                       if (pending != null) {
-                               pending.collectStackTraces(executionId, 
stackTraces);
-
-                               // Publish the sample
-                               if (pending.isComplete()) {
-                                       pendingSamples.remove(sampleId);
-                                       rememberRecentSampleId(sampleId);
-
-                                       pending.completePromiseAndDiscard();
-                               }
-                       } else if (recentPendingSamples.contains(sampleId)) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Received late stack trace 
sample {} of task {}",
-                                                       sampleId, executionId);
-                               }
-                       } else {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Unknown sample ID " + 
sampleId);
-                               }
-                       }
-               }
-       }
-
-       private void rememberRecentSampleId(int sampleId) {
-               if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
-                       recentPendingSamples.removeFirst();
-               }
-               recentPendingSamples.addLast(sampleId);
-       }
-
-       int getNumberOfPendingSamples() {
-               synchronized (lock) {
-                       return pendingSamples.size();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A pending stack trace sample, which collects stack traces and owns a
-        * {@link StackTraceSample} promise.
-        *
-        * <p>Access pending sample in lock scope.
-        */
-       private static class PendingStackTraceSample {
-
-               private final int sampleId;
-               private final long startTime;
-               private final Set<ExecutionAttemptID> pendingTasks;
-               private final Map<ExecutionAttemptID, 
List<StackTraceElement[]>> stackTracesByTask;
-               private final CompletableFuture<StackTraceSample> 
stackTraceFuture;
-
-               private boolean isDiscarded;
-
-               PendingStackTraceSample(
-                               int sampleId,
-                               ExecutionAttemptID[] tasksToCollect) {
-
-                       this.sampleId = sampleId;
-                       this.startTime = System.currentTimeMillis();
-                       this.pendingTasks = new 
HashSet<>(Arrays.asList(tasksToCollect));
-                       this.stackTracesByTask = 
Maps.newHashMapWithExpectedSize(tasksToCollect.length);
-                       this.stackTraceFuture = new CompletableFuture<>();
-               }
-
-               int getSampleId() {
-                       return sampleId;
-               }
-
-               long getStartTime() {
-                       return startTime;
-               }
-
-               boolean isDiscarded() {
-                       return isDiscarded;
-               }
-
-               boolean isComplete() {
-                       if (isDiscarded) {
-                               throw new IllegalStateException("Discarded");
-                       }
-
-                       return pendingTasks.isEmpty();
-               }
-
-               void discard(Throwable cause) {
-                       if (!isDiscarded) {
-                               pendingTasks.clear();
-                               stackTracesByTask.clear();
-
-                               stackTraceFuture.completeExceptionally(new 
RuntimeException("Discarded", cause));
-
-                               isDiscarded = true;
-                       }
-               }
-
-               void collectStackTraces(ExecutionAttemptID executionId, 
List<StackTraceElement[]> stackTraces) {
-                       if (isDiscarded) {
-                               throw new IllegalStateException("Discarded");
-                       }
-
-                       if (pendingTasks.remove(executionId)) {
-                               stackTracesByTask.put(executionId, 
Collections.unmodifiableList(stackTraces));
-                       } else if (isComplete()) {
-                               throw new IllegalStateException("Completed");
-                       } else {
-                               throw new IllegalArgumentException("Unknown 
task " + executionId);
-                       }
-               }
-
-               void completePromiseAndDiscard() {
-                       if (isComplete()) {
-                               isDiscarded = true;
-
-                               long endTime = System.currentTimeMillis();
-
-                               StackTraceSample stackTraceSample = new 
StackTraceSample(
-                                               sampleId,
-                                               startTime,
-                                               endTime,
-                                               stackTracesByTask);
-
-                               stackTraceFuture.complete(stackTraceSample);
-                       } else {
-                               throw new IllegalStateException("Not completed 
yet");
-                       }
-               }
-
-               @SuppressWarnings("unchecked")
-               CompletableFuture<StackTraceSample> getStackTraceSampleFuture() 
{
-                       return stackTraceFuture;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
deleted file mode 100644
index 9839abd..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-/**
- * Marker interface for web handlers which can describe their paths.
- */
-public interface WebHandler {
-
-       /**
-        * Returns an array of REST URL's under which this handler can be 
registered.
-        *
-        * @return array containing REST URL's under which this handler can be 
registered.
-        */
-       String[] getPaths();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 71e1593..cd128de 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -27,50 +27,54 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers;
+import org.apache.flink.runtime.rest.handler.legacy.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobPlanHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobStoppingHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.JobVertexAccumulatorsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.JobVertexTaskManagersHandler;
+import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtasksTimesHandler;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagerLogHandler;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
-import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
-import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
-import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache;
-import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -189,7 +193,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        this.uploadDir = null;
                }
 
-               ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
+               ExecutionGraphHolder currentGraphs = new 
ExecutionGraphHolder(timeout);
 
                // - Back pressure stats 
----------------------------------------------
 
@@ -255,7 +259,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router, new SubtasksTimesHandler(currentGraphs, executor));
                get(router, new JobVertexTaskManagersHandler(currentGraphs, 
executor, metricFetcher));
                get(router, new JobVertexAccumulatorsHandler(currentGraphs, 
executor));
-               get(router, new JobVertexBackPressureHandler(currentGraphs, 
executor,   backPressureStatsTracker, refreshInterval));
+               get(router, new JobVertexBackPressureHandler(currentGraphs, 
executor, backPressureStatsTracker, refreshInterval));
                get(router, new JobVertexMetricsHandler(executor, 
metricFetcher));
                get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, 
executor));
                get(router, new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
deleted file mode 100644
index 2445d3f..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.files;
-
-/*****************************************************************************
- * This code is based on the "HttpStaticFileServerHandler" from the
- * Netty project's HTTP server example.
- *
- * See http://netty.io and
- * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
- *****************************************************************************/
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.util.MimeTypes;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Locale;
-import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Simple file server handler that serves requests to web frontend's static 
files, such as
- * HTML, CSS, or JS files.
- *
- * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
- * example.</p>
- */
-@ChannelHandler.Sharable
-public class StaticFileServerHandler<T extends RestfulGateway> extends 
RedirectHandler<T> {
-
-       /** Timezone in which this server answers its "if-modified" requests. */
-       private static final TimeZone GMT_TIMEZONE = 
TimeZone.getTimeZone("GMT");
-
-       /** Date format for HTTP. */
-       public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy 
HH:mm:ss zzz";
-
-       /** Be default, we allow files to be cached for 5 minutes. */
-       private static final int HTTP_CACHE_SECONDS = 300;
-
-       // 
------------------------------------------------------------------------
-
-       /** The path in which the static documents are. */
-       private final File rootPath;
-
-       public StaticFileServerHandler(
-                       GatewayRetriever<T> retriever,
-                       CompletableFuture<String> localJobManagerAddressFuture,
-                       Time timeout,
-                       File rootPath) throws IOException {
-
-               super(localJobManagerAddressFuture, retriever, timeout);
-
-               this.rootPath = checkNotNull(rootPath).getCanonicalFile();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Responses to requests
-       // 
------------------------------------------------------------------------
-
-       @Override
-       protected void respondAsLeader(ChannelHandlerContext 
channelHandlerContext, Routed routed, T gateway) throws Exception {
-               final HttpRequest request = routed.request();
-               final String requestPath;
-
-               // make sure we request the "index.html" in case there is a 
directory request
-               if (routed.path().endsWith("/")) {
-                       requestPath = routed.path() + "index.html";
-               }
-               // in case the files being accessed are logs or stdout files, 
find appropriate paths.
-               else if (routed.path().equals("/jobmanager/log") || 
routed.path().equals("/jobmanager/stdout")) {
-                       requestPath = "";
-               } else {
-                       requestPath = routed.path();
-               }
-
-               respondToRequest(channelHandlerContext, request, requestPath);
-       }
-
-       /**
-        * Response when running with leading JobManager.
-        */
-       private void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
request, String requestPath)
-                       throws IOException, ParseException, URISyntaxException {
-
-               // convert to absolute path
-               final File file = new File(rootPath, requestPath);
-
-               if (!file.exists()) {
-                       // file does not exist. Try to load it with the 
classloader
-                       ClassLoader cl = 
StaticFileServerHandler.class.getClassLoader();
-
-                       try (InputStream resourceStream = 
cl.getResourceAsStream("web" + requestPath)) {
-                               boolean success = false;
-                               try {
-                                       if (resourceStream != null) {
-                                               URL root = 
cl.getResource("web");
-                                               URL requested = 
cl.getResource("web" + requestPath);
-
-                                               if (root != null && requested 
!= null) {
-                                                       URI rootURI = new 
URI(root.getPath()).normalize();
-                                                       URI requestedURI = new 
URI(requested.getPath()).normalize();
-
-                                                       // Check that we don't 
load anything from outside of the
-                                                       // expected scope.
-                                                       if 
(!rootURI.relativize(requestedURI).equals(requestedURI)) {
-                                                               
logger.debug("Loading missing file from classloader: {}", requestPath);
-                                                               // ensure that 
directory to file exists.
-                                                               
file.getParentFile().mkdirs();
-                                                               
Files.copy(resourceStream, file.toPath());
-
-                                                               success = true;
-                                                       }
-                                               }
-                                       }
-                               } catch (Throwable t) {
-                                       logger.error("error while responding", 
t);
-                               } finally {
-                                       if (!success) {
-                                               logger.debug("Unable to load 
requested file {} from classloader", requestPath);
-                                               sendError(ctx, NOT_FOUND);
-                                               return;
-                                       }
-                               }
-                       }
-               }
-
-               if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
-                       sendError(ctx, NOT_FOUND);
-                       return;
-               }
-
-               if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-                       sendError(ctx, NOT_FOUND);
-                       return;
-               }
-
-               // cache validation
-               final String ifModifiedSince = 
request.headers().get(IF_MODIFIED_SINCE);
-               if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
-                       SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-                       Date ifModifiedSinceDate = 
dateFormatter.parse(ifModifiedSince);
-
-                       // Only compare up to the second because the datetime 
format we send to the client
-                       // does not have milliseconds
-                       long ifModifiedSinceDateSeconds = 
ifModifiedSinceDate.getTime() / 1000;
-                       long fileLastModifiedSeconds = file.lastModified() / 
1000;
-                       if (ifModifiedSinceDateSeconds == 
fileLastModifiedSeconds) {
-                               if (logger.isDebugEnabled()) {
-                                       logger.debug("Responding 'NOT MODIFIED' 
for file '" + file.getAbsolutePath() + '\'');
-                               }
-
-                               sendNotModified(ctx);
-                               return;
-                       }
-               }
-
-               if (logger.isDebugEnabled()) {
-                       logger.debug("Responding with file '" + 
file.getAbsolutePath() + '\'');
-               }
-
-               // Don't need to close this manually. Netty's DefaultFileRegion 
will take care of it.
-               final RandomAccessFile raf;
-               try {
-                       raf = new RandomAccessFile(file, "r");
-               }
-               catch (FileNotFoundException e) {
-                       sendError(ctx, NOT_FOUND);
-                       return;
-               }
-
-               try {
-                       long fileLength = raf.length();
-
-                       HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
-                       setContentTypeHeader(response, file);
-
-                       // since the log and out files are rapidly changing, we 
don't want to browser to cache them
-                       if (!(requestPath.contains("log") || 
requestPath.contains("out"))) {
-                               setDateAndCacheHeaders(response, file);
-                       }
-                       if (HttpHeaders.isKeepAlive(request)) {
-                               response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
-                       }
-                       HttpHeaders.setContentLength(response, fileLength);
-
-                       // write the initial line and the header.
-                       ctx.write(response);
-
-                       // write the content.
-                       ChannelFuture lastContentFuture;
-                       if (ctx.pipeline().get(SslHandler.class) == null) {
-                               ctx.write(new 
DefaultFileRegion(raf.getChannel(), 0, fileLength), 
ctx.newProgressivePromise());
-                               lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-                       } else {
-                               lastContentFuture = ctx.writeAndFlush(new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
-                                       ctx.newProgressivePromise());
-                               // HttpChunkedInput will write the end marker 
(LastHttpContent) for us.
-                       }
-
-                       // close the connection, if no keep-alive is needed
-                       if (!HttpHeaders.isKeepAlive(request)) {
-                               
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-                       }
-               } catch (Exception e) {
-                       raf.close();
-                       logger.error("Failed to serve file.", e);
-                       sendError(ctx, INTERNAL_SERVER_ERROR);
-               }
-       }
-
-       @Override
-       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
-               if (ctx.channel().isActive()) {
-                       logger.error("Caught exception", cause);
-                       sendError(ctx, INTERNAL_SERVER_ERROR);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities to encode headers and responses
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Writes a simple  error response message.
-        *
-        * @param ctx    The channel context to write the response to.
-        * @param status The response status.
-        */
-       public static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
-               FullHttpResponse response = new DefaultFullHttpResponse(
-                               HTTP_1_1, status, 
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
-               response.headers().set(CONTENT_TYPE, "text/plain; 
charset=UTF-8");
-
-               // close the connection as soon as the error message is sent.
-               
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-       }
-
-       /**
-        * Send the "304 Not Modified" response. This response can be used when 
the
-        * file timestamp is the same as what the browser is sending up.
-        *
-        * @param ctx The channel context to write the response to.
-        */
-       public static void sendNotModified(ChannelHandlerContext ctx) {
-               FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
-               setDateHeader(response);
-
-               // close the connection as soon as the error message is sent.
-               
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-       }
-
-       /**
-        * Sets the "date" header for the HTTP response.
-        *
-        * @param response HTTP response
-        */
-       public static void setDateHeader(FullHttpResponse response) {
-               SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-               dateFormatter.setTimeZone(GMT_TIMEZONE);
-
-               Calendar time = new GregorianCalendar();
-               response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
-       }
-
-       /**
-        * Sets the "date" and "cache" headers for the HTTP Response.
-        *
-        * @param response    The HTTP response object.
-        * @param fileToCache File to extract the modification timestamp from.
-        */
-       public static void setDateAndCacheHeaders(HttpResponse response, File 
fileToCache) {
-               SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-               dateFormatter.setTimeZone(GMT_TIMEZONE);
-
-               // date header
-               Calendar time = new GregorianCalendar();
-               response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
-
-               // cache headers
-               time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
-               response.headers().set(EXPIRES, 
dateFormatter.format(time.getTime()));
-               response.headers().set(CACHE_CONTROL, "private, max-age=" + 
HTTP_CACHE_SECONDS);
-               response.headers().set(LAST_MODIFIED, dateFormatter.format(new 
Date(fileToCache.lastModified())));
-       }
-
-       /**
-        * Sets the content type header for the HTTP Response.
-        *
-        * @param response HTTP response
-        * @param file     file to extract content type
-        */
-       public static void setContentTypeHeader(HttpResponse response, File 
file) {
-               String mimeType = 
MimeTypes.getMimeTypeForFileName(file.getName());
-               String mimeFinal = mimeType != null ? mimeType : 
MimeTypes.getDefaultMimeType();
-               response.headers().set(CONTENT_TYPE, mimeFinal);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
deleted file mode 100644
index 053d3f7..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on an ExecutionGraph
- * that can be retrieved via "jobid" parameter.
- */
-public abstract class AbstractExecutionGraphRequestHandler extends 
AbstractJsonRequestHandler {
-
-       private final ExecutionGraphHolder executionGraphHolder;
-
-       public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executor);
-               this.executionGraphHolder = 
Preconditions.checkNotNull(executionGraphHolder);
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(
-                       Map<String, String> pathParams,
-                       Map<String, String> queryParams,
-                       JobManagerGateway jobManagerGateway) {
-               String jidString = pathParams.get("jobid");
-               if (jidString == null) {
-                       throw new RuntimeException("JobId parameter missing");
-               }
-
-               JobID jid;
-               try {
-                       jid = JobID.fromHexString(jidString);
-               }
-               catch (Exception e) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid JobID string '" + jidString + "'", e));
-               }
-
-               final CompletableFuture<Optional<AccessExecutionGraph>> 
graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
-
-               return graphFuture.thenComposeAsync(
-                       (Optional<AccessExecutionGraph> optGraph) -> {
-                               if (optGraph.isPresent()) {
-                                       return handleRequest(optGraph.get(), 
pathParams);
-                               } else {
-                                       throw new FlinkFutureException(new 
NotFoundException("Could not find job with jobId " + jid + '.'));
-                               }
-                       }, executor);
-       }
-
-       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionGraph graph, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
deleted file mode 100644
index df09225..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific job 
vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).
- */
-public abstract class AbstractJobVertexRequestHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       public AbstractJobVertexRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public final CompletableFuture<String> 
handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-               final JobVertexID vid = parseJobVertexId(params);
-
-               final AccessExecutionJobVertex jobVertex = 
graph.getJobVertex(vid);
-               if (jobVertex == null) {
-                       throw new IllegalArgumentException("No vertex with ID 
'" + vid + "' exists.");
-               }
-
-               return handleRequest(jobVertex, params);
-       }
-
-       /**
-        * Returns the job vertex ID parsed from the provided parameters.
-        *
-        * @param params Path parameters
-        * @return Parsed job vertex ID or <code>null</code> if not available.
-        */
-       public static JobVertexID parseJobVertexId(Map<String, String> params) {
-               String jobVertexIdParam = params.get("vertexid");
-               if (jobVertexIdParam == null) {
-                       return null;
-               }
-
-               try {
-                       return JobVertexID.fromHexString(jobVertexIdParam);
-               } catch (RuntimeException ignored) {
-                       return null;
-               }
-       }
-
-       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
-}

Reply via email to