[FLINK-5870] Handlers define REST URLs This closes #3376.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/999bacef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/999bacef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/999bacef Branch: refs/heads/master Commit: 999baceff36165d950a61dd9cc4342f252e64837 Parents: 51b7ede Author: zentol <[email protected]> Authored: Wed Mar 1 12:23:15 2017 +0100 Committer: zentol <[email protected]> Committed: Thu Mar 2 11:39:04 2017 +0100 ---------------------------------------------------------------------- .../webmonitor/RuntimeMonitorHandler.java | 5 + .../webmonitor/RuntimeMonitorHandlerBase.java | 7 + .../runtime/webmonitor/WebRuntimeMonitor.java | 193 +++++++++++-------- .../handlers/ClusterOverviewHandler.java | 7 + .../handlers/CurrentJobIdsHandler.java | 7 + .../handlers/CurrentJobsOverviewHandler.java | 16 ++ .../handlers/DashboardConfigHandler.java | 7 + .../handlers/JarAccessDeniedHandler.java | 11 ++ .../webmonitor/handlers/JarDeleteHandler.java | 7 + .../webmonitor/handlers/JarListHandler.java | 7 + .../webmonitor/handlers/JarPlanHandler.java | 7 + .../webmonitor/handlers/JarRunHandler.java | 7 + .../webmonitor/handlers/JarUploadHandler.java | 7 + .../handlers/JobAccumulatorsHandler.java | 7 + .../handlers/JobCancellationHandler.java | 8 + .../JobCancellationWithSavepointHandlers.java | 17 +- .../webmonitor/handlers/JobConfigHandler.java | 7 + .../webmonitor/handlers/JobDetailsHandler.java | 8 + .../handlers/JobExceptionsHandler.java | 7 + .../handlers/JobManagerConfigHandler.java | 7 + .../webmonitor/handlers/JobPlanHandler.java | 6 + .../webmonitor/handlers/JobStoppingHandler.java | 8 + .../handlers/JobVertexAccumulatorsHandler.java | 7 + .../handlers/JobVertexBackPressureHandler.java | 7 + .../handlers/JobVertexDetailsHandler.java | 7 + .../handlers/JobVertexTaskManagersHandler.java | 7 + .../webmonitor/handlers/RequestHandler.java | 7 + .../SubtaskCurrentAttemptDetailsHandler.java | 7 + ...taskExecutionAttemptAccumulatorsHandler.java | 7 + .../SubtaskExecutionAttemptDetailsHandler.java | 7 + .../SubtasksAllAccumulatorsHandler.java | 7 + .../handlers/SubtasksTimesHandler.java | 6 + .../handlers/TaskManagerLogHandler.java | 12 ++ .../handlers/TaskManagersHandler.java | 8 + .../checkpoints/CheckpointConfigHandler.java | 7 + .../CheckpointStatsDetailsHandler.java | 7 + .../CheckpointStatsDetailsSubtasksHandler.java | 7 + .../checkpoints/CheckpointStatsHandler.java | 7 + .../metrics/JobManagerMetricsHandler.java | 8 + .../webmonitor/metrics/JobMetricsHandler.java | 6 + .../metrics/JobVertexMetricsHandler.java | 6 + .../metrics/TaskManagerMetricsHandler.java | 8 + .../handlers/ClusterOverviewHandlerTest.java | 34 ++++ .../handlers/CurrentJobIdsHandlerTest.java | 34 ++++ .../CurrentJobsOverviewHandlerTest.java | 44 +++++ .../handlers/DashboardConfigHandlerTest.java | 31 +++ .../handlers/JarAccessDeniedHandlerTest.java | 39 ++++ .../handlers/JarDeleteHandlerTest.java | 31 +++ .../webmonitor/handlers/JarListHandlerTest.java | 31 +++ .../webmonitor/handlers/JarPlanHandlerTest.java | 31 +++ .../webmonitor/handlers/JarRunHandlerTest.java | 31 +++ .../handlers/JarUploadHandlerTest.java | 31 +++ .../handlers/JobAccumulatorsHandlerTest.java | 31 +++ .../handlers/JobCancellationHandlerTest.java | 36 ++++ ...obCancellationWithSavepointHandlersTest.java | 20 ++ .../handlers/JobConfigHandlerTest.java | 31 +++ .../handlers/JobDetailsHandlerTest.java | 36 ++++ .../handlers/JobExceptionsHandlerTest.java | 31 +++ .../handlers/JobManagerConfigHandlerTest.java | 31 +++ .../webmonitor/handlers/JobPlanHandlerTest.java | 31 +++ .../handlers/JobStoppingHandlerTest.java | 36 ++++ .../JobVertexAccumulatorsHandlerTest.java | 31 +++ .../JobVertexBackPressureHandlerTest.java | 8 + .../handlers/JobVertexDetailsHandlerTest.java | 31 +++ .../JobVertexTaskManagersHandlerTest.java | 31 +++ ...SubtaskCurrentAttemptDetailsHandlerTest.java | 31 +++ ...ExecutionAttemptAccumulatorsHandlerTest.java | 31 +++ ...btaskExecutionAttemptDetailsHandlerTest.java | 31 +++ .../SubtasksAllAccumulatorsHandlerTest.java | 31 +++ .../handlers/SubtasksTimesHandlerTest.java | 31 +++ .../handlers/TaskManagerLogHandlerTest.java | 28 +++ .../handlers/TaskManagersHandlerTest.java | 38 ++++ .../CheckpointConfigHandlerTest.java | 9 + .../CheckpointStatsDetailsHandlerTest.java | 9 + .../checkpoints/CheckpointStatsHandlerTest.java | 9 + ...heckpointStatsSubtaskDetailsHandlerTest.java | 9 + .../metrics/JobManagerMetricsHandlerTest.java | 10 + .../metrics/JobMetricsHandlerTest.java | 10 + .../metrics/JobVertexMetricsHandlerTest.java | 10 + .../metrics/TaskManagerMetricsHandlerTest.java | 10 + 80 files changed, 1439 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 8dbd135..8bd58a3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -76,6 +76,11 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { } @Override + public String[] getPaths() { + return handler.getPaths(); + } + + @Override protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { FullHttpResponse response; http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java index 9442867..3c1dcb6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java @@ -67,6 +67,13 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand this.httpsEnabled = httpsEnabled; } + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + public abstract String[] getPaths(); + @Override protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { if (localJobManagerAddressFuture.isCompleted()) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index a9cb630..dddc69d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -256,56 +256,50 @@ public class WebRuntimeMonitor implements WebMonitor { RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); - router = new Router() - // config how to interact with this web server - .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) - - // the overview - how many task managers, slots, free slots, ... - .GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT))) - - // job manager configuration - .GET("/jobmanager/config", handler(new JobManagerConfigHandler(config))) - - // overview over jobs - .GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true))) - .GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false))) - .GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true))) - - .GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT))) - - .GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) - - .GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler( - currentGraphs, - backPressureStatsTracker, - refreshInterval))) - .GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs))) - - .GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs))) - .GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs))) - .GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs))) - .GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher))) - - .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", - new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.LOG, config, enableSSL)) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", - new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher))) + router = new Router(); + // config how to interact with this web server + GET(router, new DashboardConfigHandler(cfg.getRefreshInterval())); + + // the overview - how many task managers, slots, free slots, ... + GET(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)); + + // job manager configuration + GET(router, new JobManagerConfigHandler(config)); + + // overview over jobs + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)); + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)); + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)); + + GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)); + + GET(router, new JobDetailsHandler(currentGraphs, metricFetcher)); + + GET(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtasksTimesHandler(currentGraphs)); + GET(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)); + GET(router, new JobVertexAccumulatorsHandler(currentGraphs)); + GET(router, new JobVertexBackPressureHandler(currentGraphs, backPressureStatsTracker, refreshInterval)); + GET(router, new JobVertexMetricsHandler(metricFetcher)); + GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs)); + GET(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)); + + GET(router, new JobPlanHandler(currentGraphs)); + GET(router, new JobConfigHandler(currentGraphs)); + GET(router, new JobExceptionsHandler(currentGraphs)); + GET(router, new JobAccumulatorsHandler(currentGraphs)); + GET(router, new JobMetricsHandler(metricFetcher)); + + GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)); + GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, + TaskManagerLogHandler.FileMode.LOG, config, enableSSL)); + GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, + TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)); + GET(router, new TaskManagerMetricsHandler(metricFetcher)); + router // log and stdout .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile, @@ -313,25 +307,22 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile, - enableSSL)) - - .GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher))) + enableSSL)); - // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) - .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) + GET(router, new JobManagerMetricsHandler(metricFetcher)); - // DELETE is the preferred way of canceling a job (Rest-conform) - .DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler())) + // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) + GET(router, new JobCancellationHandler()); + // DELETE is the preferred way of canceling a job (Rest-conform) + DELETE(router, new JobCancellationHandler()); - .GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler) - .GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler) - .GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler) + GET(router, triggerHandler); + GET(router, inProgressHandler); - // stop a job via GET (for proper integration with YARN this has to be performed via GET) - .GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler())) - - // DELETE is the preferred way of stopping a job (Rest-conform) - .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())); + // stop a job via GET (for proper integration with YARN this has to be performed via GET) + GET(router, new JobStoppingHandler()); + // DELETE is the preferred way of stopping a job (Rest-conform) + DELETE(router, new JobStoppingHandler()); int maxCachedEntries = config.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, @@ -339,34 +330,32 @@ public class WebRuntimeMonitor implements WebMonitor { CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers - router - .GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs))) - .GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs))) - .GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs, cache))) - .GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache))); + GET(router, new CheckpointStatsHandler(currentGraphs)); + GET(router, new CheckpointConfigHandler(currentGraphs)); + GET(router, new CheckpointStatsDetailsHandler(currentGraphs, cache)); + GET(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)); if (webSubmitAllow) { - router - // fetch the list of uploaded jars. - .GET("/jars", handler(new JarListHandler(uploadDir))) + // fetch the list of uploaded jars. + GET(router, new JarListHandler(uploadDir)); - // get plan for an uploaded jar - .GET("/jars/:jarid/plan", handler(new JarPlanHandler(uploadDir))) + // get plan for an uploaded jar + GET(router, new JarPlanHandler(uploadDir)); - // run a jar - .POST("/jars/:jarid/run", handler(new JarRunHandler(uploadDir, timeout, config))) + // run a jar + POST(router, new JarRunHandler(uploadDir, timeout, config)); - // upload a jar - .POST("/jars/upload", handler(new JarUploadHandler(uploadDir))) + // upload a jar + POST(router, new JarUploadHandler(uploadDir)); - // delete an uploaded jar from submission interface - .DELETE("/jars/:jarid", handler(new JarDeleteHandler(uploadDir))); + // delete an uploaded jar from submission interface + DELETE(router, new JarDeleteHandler(uploadDir)); } else { - router - // send an Access Denied message (sort of) - // Every other GET request will go to the File Server, which will not provide - // access to the jar directory anyway, because it doesn't exist in webRootDir. - .GET("/jars", handler(new JarAccessDeniedHandler())); + // send an Access Denied message + JarAccessDeniedHandler jad = new JarAccessDeniedHandler(); + GET(router, jad); + POST(router, jad); + DELETE(router, jad); } // this handler serves all the static contents @@ -526,6 +515,40 @@ public class WebRuntimeMonitor implements WebMonitor { } } + /** These methods are used in the route path setup. They register the given {@link RequestHandler} or + * {@link RuntimeMonitorHandlerBase} with the given {@link Router} for the respective REST method. + * The REST paths under which they are registered are defined by the handlers. **/ + + private void GET(Router router, RequestHandler handler) { + GET(router, handler(handler)); + } + + private void GET(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.GET(path, handler); + } + } + + private void DELETE(Router router, RequestHandler handler) { + DELETE(router, handler(handler)); + } + + private void DELETE(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.DELETE(path, handler); + } + } + + private void POST(Router router, RequestHandler handler) { + POST(router, handler(handler)); + } + + private void POST(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.POST(path, handler); + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java index 99ef3d9..2bd055d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java @@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ClusterOverviewHandler extends AbstractJsonRequestHandler { + private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview"; + private static final String version = EnvironmentInformation.getVersion(); private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; @@ -49,6 +51,11 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{CLUSTER_OVERVIEW_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { // we need no parameters, get all requests try { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java index b690c56..94b1c16 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java @@ -39,11 +39,18 @@ import static java.util.Objects.requireNonNull; */ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { + private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs"; + private final FiniteDuration timeout; public CurrentJobIdsHandler(FiniteDuration timeout) { this.timeout = requireNonNull(timeout); } + + @Override + public String[] getPaths() { + return new String[]{CURRENT_JOB_IDS_REST_PATH}; + } @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index 07064da..8486a9c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -38,6 +38,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { + private static final String ALL_JOBS_REST_PATH = "/joboverview"; + private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; + private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed"; + private final FiniteDuration timeout; private final boolean includeRunningJobs; @@ -55,6 +59,18 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + if (includeRunningJobs && includeFinishedJobs) { + return new String[]{ALL_JOBS_REST_PATH}; + } + if (includeRunningJobs) { + return new String[]{RUNNING_JOBS_REST_PATH}; + } else { + return new String[]{COMPLETED_JOBS_REST_PATH}; + } + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java index 6fe072b..49f4c26 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java @@ -32,6 +32,8 @@ import java.util.TimeZone; * and time zone of the server timestamps. */ public class DashboardConfigHandler extends AbstractJsonRequestHandler { + + private static String DASHBOARD_CONFIG_REST_PATH = "/config"; private final String configString; @@ -65,6 +67,11 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler { throw new RuntimeException(e.getMessage(), e); } } + + @Override + public String[] getPaths() { + return new String[]{DASHBOARD_CONFIG_REST_PATH}; + } @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java index ba32d0d..f0e3faf 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java @@ -28,6 +28,17 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler { "available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}"; @Override + public String[] getPaths() { + return new String[]{ + JarListHandler.JAR_LIST_REST_PATH, + JarPlanHandler.JAR_PLAN_REST_PATH, + JarRunHandler.JAR_RUN_REST_PATH, + JarUploadHandler.JAR_UPLOAD_REST_PATH, + JarDeleteHandler.JAR_DELETE_REST_PATH + }; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { return ERROR_MESSAGE; } http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index ae959a5..f3bf231 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -31,6 +31,8 @@ import java.util.Map; */ public class JarDeleteHandler extends AbstractJsonRequestHandler { + static final String JAR_DELETE_REST_PATH = "/jars/:jarid"; + private final File jarDir; public JarDeleteHandler(File jarDirectory) { @@ -38,6 +40,11 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JAR_DELETE_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { final String file = pathParams.get("jarid"); try { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index f3cdc30..81b5e34 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -33,6 +33,8 @@ import java.util.jar.Manifest; public class JarListHandler extends AbstractJsonRequestHandler { + static final String JAR_LIST_REST_PATH = "/jars"; + private final File jarDir; public JarListHandler(File jarDirectory) { @@ -40,6 +42,11 @@ public class JarListHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JAR_LIST_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index bd0a6af..d121119 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -32,11 +32,18 @@ import java.util.Map; */ public class JarPlanHandler extends JarActionHandler { + static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan"; + public JarPlanHandler(File jarDirectory) { super(jarDirectory); } @Override + public String[] getPaths() { + return new String[]{JAR_PLAN_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 474be33..5f39e19 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -38,6 +38,8 @@ import java.util.Map; */ public class JarRunHandler extends JarActionHandler { + static final String JAR_RUN_REST_PATH = "/jars/:jarid/run"; + private final FiniteDuration timeout; private final Configuration clientConfig; @@ -48,6 +50,11 @@ public class JarRunHandler extends JarActionHandler { } @Override + public String[] getPaths() { + return new String[]{JAR_RUN_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 9a3b0e1..3d7cb8a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -29,6 +29,8 @@ import java.util.UUID; */ public class JarUploadHandler extends AbstractJsonRequestHandler { + static final String JAR_UPLOAD_REST_PATH = "/jars/upload"; + private final File jarDir; public JarUploadHandler(File jarDir) { @@ -36,6 +38,11 @@ public class JarUploadHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JAR_UPLOAD_REST_PATH}; + } + + @Override public String handleJsonRequest( Map<String, String> pathParams, Map<String, String> queryParams, http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index 29613a0..7664153 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -30,12 +30,19 @@ import java.util.Map; * Request handler that returns the aggregated user accumulators of a job. */ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{JOB_ACCUMULATORS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index 9f35719..d9de7d7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -30,6 +30,14 @@ import java.util.Map; */ public class JobCancellationHandler extends AbstractJsonRequestHandler { + private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel"; + private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel"; + + @Override + public String[] getPaths() { + return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH}; + } + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java index 492ce76..b618d85 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -55,8 +55,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class JobCancellationWithSavepointHandlers { + private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint"; + private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"; + /** URL for in-progress cancellations. */ - public static final String IN_PROGRESS_URL = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; + private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; /** Encodings for String. */ private static final Charset ENCODING = Charset.forName("UTF-8"); @@ -127,6 +130,11 @@ public class JobCancellationWithSavepointHandlers { } @Override + public String[] getPaths() { + return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH}; + } + + @Override @SuppressWarnings("unchecked") public FullHttpResponse handleRequest( Map<String, String> pathParams, @@ -230,7 +238,7 @@ public class JobCancellationWithSavepointHandlers { } // In-progress location - String location = IN_PROGRESS_URL + String location = CANCELLATION_IN_PROGRESS_REST_PATH .replace(":jobid", jobId.toString()) .replace(":requestId", Long.toString(requestId)); @@ -279,6 +287,11 @@ public class JobCancellationWithSavepointHandlers { private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); @Override + public String[] getPaths() { + return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH}; + } + + @Override @SuppressWarnings("unchecked") public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 21639ef..459ca2a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -31,11 +31,18 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; */ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; + public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{JOB_CONFIG_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 35e6ca7..7780e66 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -47,6 +47,9 @@ import java.util.Map; */ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid"; + private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices"; + private final MetricFetcher fetcher; public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -55,6 +58,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { final StringWriter writer = new StringWriter(); final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index 376cca4..3720dac 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -33,6 +33,8 @@ import java.util.Map; */ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; + private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { @@ -40,6 +42,11 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JOB_EXCEPTIONS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index 11ca931..5fcf010 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -30,6 +30,8 @@ import java.util.Map; */ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { + private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; + private final Configuration config; public JobManagerConfigHandler(Configuration config) { @@ -37,6 +39,11 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index 64f7000..becc2e1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -28,12 +28,18 @@ import java.util.Map; */ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{JOB_PLAN_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { return graph.getJsonPlan(); } http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java index 0f8c958..c8ec689 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -30,6 +30,14 @@ import java.util.Map; */ public class JobStoppingHandler extends AbstractJsonRequestHandler { + private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop"; + private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop"; + + @Override + public String[] getPaths() { + return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH}; + } + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index ad4e207..ccfcbba 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -29,12 +29,19 @@ import java.util.Map; public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java index c5bacf2..52167e1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -39,6 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler { + private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = "/jobs/:jobid/vertices/:vertexid/backpressure"; + /** Back pressure stats tracker. */ private final BackPressureStatsTracker backPressureStatsTracker; @@ -57,6 +59,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle } @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH}; + } + + @Override public String handleRequest( AccessExecutionJobVertex accessJobVertex, Map<String, String> params) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 32626ba..0a07896 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -39,6 +39,8 @@ import java.util.Map; */ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { + private static String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; + private final MetricFetcher fetcher; public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -47,6 +49,11 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { } @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_DETAILS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index f468d35..b3dabea 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -43,6 +43,8 @@ import java.util.Map.Entry; */ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler { + private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/taskmanagers"; + private final MetricFetcher fetcher; public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -51,6 +53,11 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle } @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index c56cfc3..b6246e6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -49,4 +49,11 @@ public interface RequestHandler { * with the exception stack trace. */ FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception; + + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + String[] getPaths(); } http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index 6d09513..4cf5f0f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -28,12 +28,19 @@ import java.util.Map; * Request handler providing details about a single task execution attempt. */ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { + + public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder, fetcher); } @Override + public String[] getPaths() { + return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception { return handleRequest(vertex.getCurrentExecutionAttempt(), params); } http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index e613efb..ba3a5ee 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -31,12 +31,19 @@ import java.util.Map; * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). */ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler { + + private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index da8db02..b753b6e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -37,6 +37,8 @@ import java.util.Map; */ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { + private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt"; + private final MetricFetcher fetcher; public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -45,6 +47,11 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp } @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { final ExecutionState status = execAttempt.getState(); final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 892a606..222d474 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -33,12 +33,19 @@ import java.util.Map; * Request handler that returns the accumulators for all subtasks of job vertex. */ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index 76349ee..e2e35e3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -35,12 +35,18 @@ import java.util.Map; */ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { + private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{SUBTASK_TIMES_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 6583d3b..1002bf3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -94,6 +94,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class); + private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log"; + private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout"; + /** Keep track of last transmitted log, to clean up old ones */ private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>(); private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>(); @@ -141,6 +144,15 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { timeTimeout = Time.milliseconds(timeout.toMillis()); } + @Override + public String[] getPaths() { + if (serveLogFile) { + return new String[]{TASKMANAGER_LOG_REST_PATH}; + } else { + return new String[]{TASKMANAGER_OUT_REST_PATH}; + } + } + /** * Response when running with leading JobManager. */ http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index c757f5c..a23e983 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -41,6 +41,9 @@ import static java.util.Objects.requireNonNull; public class TaskManagersHandler extends AbstractJsonRequestHandler { + private static final String TASKMANAGERS_REST_PATH = "/taskmanagers"; + private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid"; + public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; private final FiniteDuration timeout; @@ -53,6 +56,11 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { } @Override + public String[] getPaths() { + return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH}; + } + + @Override public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index be0d283..de40a4a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -34,11 +34,18 @@ import java.util.Map; */ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; + public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_CONFIG_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index d461f03..e651824 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -38,6 +38,8 @@ import java.util.Map; */ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid"; + private final CheckpointStatsCache cache; public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { @@ -46,6 +48,11 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest } @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { long checkpointId = parseCheckpointId(params); if (checkpointId == -1) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index d55467f..15dd911 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -44,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid"; + private final CheckpointStatsCache cache; public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { @@ -52,6 +54,11 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap } @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH}; + } + + @Override public String handleJsonRequest( Map<String, String> pathParams, Map<String, String> queryParams, http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 404b2c7..6413806 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -43,11 +43,18 @@ import java.util.Map; */ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; + public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_REST_PATH}; + } + + @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java index 7452c71..f667ce5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java @@ -31,11 +31,19 @@ import java.util.Map; * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ public class JobManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics"; + public JobManagerMetricsHandler(MetricFetcher fetcher) { super(fetcher); } @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_METRICS_REST_PATH}; + } + + @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); if (jobManager == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java index d66c954..26c9fa9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java @@ -32,12 +32,18 @@ import java.util.Map; */ public class JobMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_JOB_ID = "jobid"; + private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; public JobMetricsHandler(MetricFetcher fetcher) { super(fetcher); } @Override + public String[] getPaths() { + return new String[]{JOB_METRICS_REST_PATH}; + } + + @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); return job != null http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java index 6fca771..3e838d7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java @@ -32,12 +32,18 @@ import java.util.Map; */ public class JobVertexMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_VERTEX_ID = "vertexid"; + private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; public JobVertexMetricsHandler(MetricFetcher fetcher) { super(fetcher); } @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_METRICS_REST_PATH}; + } + + @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java index f1b2e72..a74f5f2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java @@ -33,11 +33,19 @@ import java.util.Map; * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics"; + public TaskManagerMetricsHandler(MetricFetcher fetcher) { super(fetcher); } @Override + public String[] getPaths() { + return new String[]{TASKMANAGER_METRICS_REST_PATH}; + } + + @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); if (taskManager == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java new file mode 100644 index 0000000..018ffdd --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class ClusterOverviewHandlerTest { + @Test + public void testGetPaths() { + ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/overview", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java new file mode 100644 index 0000000..e225648 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class CurrentJobIdsHandlerTest { + @Test + public void testGetPaths() { + CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java new file mode 100644 index 0000000..3207fec --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class CurrentJobsOverviewHandlerTest { + @Test + public void testGetPaths() { + CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true); + String[] pathsAll = handlerAll.getPaths(); + Assert.assertEquals(1, pathsAll.length); + Assert.assertEquals("/joboverview", pathsAll[0]); + + CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false); + String[] pathsRunning = handlerRunning.getPaths(); + Assert.assertEquals(1, pathsRunning.length); + Assert.assertEquals("/joboverview/running", pathsRunning[0]); + + CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true); + String[] pathsCompleted = handlerCompleted.getPaths(); + Assert.assertEquals(1, pathsCompleted.length); + Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java new file mode 100644 index 0000000..aa2d552 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class DashboardConfigHandlerTest { + @Test + public void testGetPaths() { + DashboardConfigHandler handler = new DashboardConfigHandler(10000L); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/config", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/999bacef/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java new file mode 100644 index 0000000..e84926e --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JarAccessDeniedHandlerTest { + @Test + public void testGetPaths() { + JarAccessDeniedHandler handler = new JarAccessDeniedHandler(); + String[] paths = handler.getPaths(); + Assert.assertEquals(5, paths.length); + List<String> pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jars")); + Assert.assertTrue(pathsList.contains("/jars/upload")); + Assert.assertTrue(pathsList.contains("/jars/:jarid")); + Assert.assertTrue(pathsList.contains("/jars/:jarid/plan")); + Assert.assertTrue(pathsList.contains("/jars/:jarid/run")); + } +}
