[
https://issues.apache.org/jira/browse/FLINK-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888276#comment-15888276
]
ASF GitHub Bot commented on FLINK-5870:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3376#discussion_r103485781
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
---
@@ -256,117 +256,106 @@ public WebRuntimeMonitor(
RuntimeMonitorHandler triggerHandler =
handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler =
handler(cancelWithSavepoint.getInProgressHandler());
- router = new Router()
- // config how to interact with this web server
- .GET("/config", handler(new
DashboardConfigHandler(cfg.getRefreshInterval())))
-
- // the overview - how many task managers, slots, free
slots, ...
- .GET("/overview", handler(new
ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
-
- // job manager configuration
- .GET("/jobmanager/config", handler(new
JobManagerConfigHandler(config)))
-
- // overview over jobs
- .GET("/joboverview", handler(new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))
- .GET("/joboverview/running", handler(new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)))
- .GET("/joboverview/completed", handler(new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)))
-
- .GET("/jobs", handler(new
CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
-
- .GET("/jobs/:jobid", handler(new
JobDetailsHandler(currentGraphs, metricFetcher)))
- .GET("/jobs/:jobid/vertices", handler(new
JobDetailsHandler(currentGraphs, metricFetcher)))
-
- .GET("/jobs/:jobid/vertices/:vertexid", handler(new
JobVertexDetailsHandler(currentGraphs, metricFetcher)))
- .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes",
handler(new SubtasksTimesHandler(currentGraphs)))
- .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers",
handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
- .GET("/jobs/:jobid/vertices/:vertexid/accumulators",
handler(new JobVertexAccumulatorsHandler(currentGraphs)))
- .GET("/jobs/:jobid/vertices/:vertexid/backpressure",
handler(new JobVertexBackPressureHandler(
- currentGraphs,
-
backPressureStatsTracker,
- refreshInterval)))
- .GET("/jobs/:jobid/vertices/:vertexid/metrics",
handler(new JobVertexMetricsHandler(metricFetcher)))
-
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new
SubtasksAllAccumulatorsHandler(currentGraphs)))
-
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new
SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)))
-
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs,
metricFetcher)))
-
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
-
- .GET("/jobs/:jobid/plan", handler(new
JobPlanHandler(currentGraphs)))
- .GET("/jobs/:jobid/config", handler(new
JobConfigHandler(currentGraphs)))
- .GET("/jobs/:jobid/exceptions", handler(new
JobExceptionsHandler(currentGraphs)))
- .GET("/jobs/:jobid/accumulators", handler(new
JobAccumulatorsHandler(currentGraphs)))
- .GET("/jobs/:jobid/metrics", handler(new
JobMetricsHandler(metricFetcher)))
-
- .GET("/taskmanagers", handler(new
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
- .GET("/taskmanagers/:" +
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
- .GET("/taskmanagers/:" +
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log",
- new TaskManagerLogHandler(retriever, context,
jobManagerAddressPromise.future(), timeout,
- TaskManagerLogHandler.FileMode.LOG,
config, enableSSL))
- .GET("/taskmanagers/:" +
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout",
- new TaskManagerLogHandler(retriever, context,
jobManagerAddressPromise.future(), timeout,
- TaskManagerLogHandler.FileMode.STDOUT,
config, enableSSL))
- .GET("/taskmanagers/:" +
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new
TaskManagerMetricsHandler(metricFetcher)))
+ router = new Router();
+ // config how to interact with this web server
+ GET(router, new
DashboardConfigHandler(cfg.getRefreshInterval()));
+
+ // the overview - how many task managers, slots, free slots, ...
+ GET(router, new
ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+
+ // job manager configuration
+ GET(router, new JobManagerConfigHandler(config));
+
+ // overview over jobs
+ GET(router, new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
+ GET(router, new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
+ GET(router, new
CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
+
+ GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
+
+ GET(router, new JobDetailsHandler(currentGraphs,
metricFetcher));
+
+ GET(router, new JobVertexDetailsHandler(currentGraphs,
metricFetcher));
+ GET(router, new SubtasksTimesHandler(currentGraphs));
+ GET(router, new JobVertexTaskManagersHandler(currentGraphs,
metricFetcher));
+ GET(router, new JobVertexAccumulatorsHandler(currentGraphs));
+ GET(router, new JobVertexBackPressureHandler(currentGraphs,
backPressureStatsTracker, refreshInterval));
+ GET(router, new JobVertexMetricsHandler(metricFetcher));
+ GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
+ GET(router, new
SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
+ GET(router, new
SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
+ GET(router, new
SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
+
+ GET(router, new JobPlanHandler(currentGraphs));
+ GET(router, new JobConfigHandler(currentGraphs));
+ GET(router, new JobExceptionsHandler(currentGraphs));
+ GET(router, new JobAccumulatorsHandler(currentGraphs));
+ GET(router, new JobMetricsHandler(metricFetcher));
+
+ GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT,
metricFetcher));
+ GET(router, new TaskManagerLogHandler(retriever, context,
jobManagerAddressPromise.future(), timeout,
+ TaskManagerLogHandler.FileMode.LOG, config,
enableSSL));
+ GET(router, new TaskManagerLogHandler(retriever, context,
jobManagerAddressPromise.future(), timeout,
+ TaskManagerLogHandler.FileMode.STDOUT, config,
enableSSL));
+ GET(router, new TaskManagerMetricsHandler(metricFetcher));
+ router
// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new
ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler(retriever,
jobManagerAddressPromise.future(), timeout, logFiles.logFile,
enableSSL))
.GET("/jobmanager/stdout", logFiles.stdOutFile == null
? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler(retriever,
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
- enableSSL))
-
- .GET("/jobmanager/metrics", handler(new
JobManagerMetricsHandler(metricFetcher)))
+ enableSSL));
- // Cancel a job via GET (for proper integration with
YARN this has to be performed via GET)
- .GET("/jobs/:jobid/yarn-cancel", handler(new
JobCancellationHandler()))
+ GET(router, new JobManagerMetricsHandler(metricFetcher));
- // DELETE is the preferred way of canceling a job
(Rest-conform)
- .DELETE("/jobs/:jobid/cancel", handler(new
JobCancellationHandler()))
+ // Cancel a job via GET (for proper integration with YARN this
has to be performed via GET)
+ GET(router, new JobCancellationHandler());
+ // DELETE is the preferred way of canceling a job (Rest-conform)
+ DELETE(router, new JobCancellationHandler());
- .GET("/jobs/:jobid/cancel-with-savepoint",
triggerHandler)
-
.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory",
triggerHandler)
-
.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
+ GET(router, triggerHandler);
+ GET(router, inProgressHandler);
- // stop a job via GET (for proper integration with YARN
this has to be performed via GET)
- .GET("/jobs/:jobid/yarn-stop", handler(new
JobStoppingHandler()))
-
- // DELETE is the preferred way of stopping a job
(Rest-conform)
- .DELETE("/jobs/:jobid/stop", handler(new
JobStoppingHandler()));
+ // stop a job via GET (for proper integration with YARN this
has to be performed via GET)
+ GET(router, new JobStoppingHandler());
+ // DELETE is the preferred way of stopping a job (Rest-conform)
+ DELETE(router, new JobStoppingHandler());
int maxCachedEntries = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new
CheckpointStatsCache(maxCachedEntries);
// Register the checkpoint stats handlers
- router
- .GET("/jobs/:jobid/checkpoints", handler(new
CheckpointStatsHandler(currentGraphs)))
- .GET("/jobs/:jobid/checkpoints/config", handler(new
CheckpointConfigHandler(currentGraphs)))
- .GET("/jobs/:jobid/checkpoints/details/:checkpointid",
handler(new CheckpointStatsDetailsHandler(currentGraphs, cache)))
-
.GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid",
handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
+ GET(router, new CheckpointStatsHandler(currentGraphs));
+ GET(router, new CheckpointConfigHandler(currentGraphs));
+ GET(router, new CheckpointStatsDetailsHandler(currentGraphs,
cache));
+ GET(router, new
CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
if (webSubmitAllow) {
- router
- // fetch the list of uploaded jars.
- .GET("/jars", handler(new
JarListHandler(uploadDir)))
+ // fetch the list of uploaded jars.
+ GET(router, new JarListHandler(uploadDir));
- // get plan for an uploaded jar
- .GET("/jars/:jarid/plan", handler(new
JarPlanHandler(uploadDir)))
+ // get plan for an uploaded jar
+ GET(router, new JarPlanHandler(uploadDir));
- // run a jar
- .POST("/jars/:jarid/run", handler(new
JarRunHandler(uploadDir, timeout, config)))
+ // run a jar
+ POST(router, new JarRunHandler(uploadDir, timeout,
config));
- // upload a jar
- .POST("/jars/upload", handler(new
JarUploadHandler(uploadDir)))
+ // upload a jar
+ POST(router, new JarUploadHandler(uploadDir));
- // delete an uploaded jar from submission
interface
- .DELETE("/jars/:jarid", handler(new
JarDeleteHandler(uploadDir)));
+ // delete an uploaded jar from submission interface
+ DELETE(router, new JarDeleteHandler(uploadDir));
} else {
- router
- // send an Access Denied message (sort of)
- // Every other GET request will go to the File
Server, which will not provide
- // access to the jar directory anyway, because
it doesn't exist in webRootDir.
- .GET("/jars", handler(new
JarAccessDeniedHandler()));
+ // send an Access Denied message
+ JarAccessDeniedHandler jad = new
JarAccessDeniedHandler();
+ GET(router, jad);
+ POST(router, jad);
--- End diff --
I think they got a 404 previously as unmatched paths are handled by the
StaticFileServer.
> Make handlers aware of their REST URLs
> --------------------------------------
>
> Key: FLINK-5870
> URL: https://issues.apache.org/jira/browse/FLINK-5870
> Project: Flink
> Issue Type: Improvement
> Components: Webfrontend
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Fix For: 1.3.0
>
>
> The handlers in the WebRuntimeMonitor are currently unaware of the actual
> REST URL used. The handlers are simply registered under a given URL, without
> any guarantee that the handler can actually deal with that URL.
> I propose to let handlers themselves specify under which URL's they are
> supposed to be reachable. This provides are tighter coupling between URL and
> handler.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)