Repository: flink Updated Branches: refs/heads/master b6a1b6e9d -> 0e5a157ae
[FLINK-9339][REST] Register subtask accumulator handler under correct URL This closes #5997. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e5a157a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e5a157a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e5a157a Branch: refs/heads/master Commit: 0e5a157ae299055e7d2b3b83a82f6d719acad0ea Parents: b6a1b6e Author: yanghua <[email protected]> Authored: Sat May 12 19:10:54 2018 +0800 Committer: zentol <[email protected]> Committed: Sat May 12 19:42:13 2018 +0200 ---------------------------------------------------------------------- .../runtime/webmonitor/WebMonitorEndpoint.java | 90 +++++++++----------- 1 file changed, 39 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0e5a157a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index b524cd7..5cb57d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -67,8 +67,6 @@ import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; @@ -107,19 +105,9 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; -import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders; @@ -582,53 +570,53 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp optWebContent = Optional.empty(); } - handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); - handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler)); - handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler)); - handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler)); - handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler)); - handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler)); - handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); - handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); - handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); - handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler)); - handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); - handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); - handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler)); - handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), subtasksAllAccumulatorsHandler)); - handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler)); - handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler)); - handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler)); - handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler)); - handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler)); - handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler)); - handlers.add(Tuple2.of(JobMetricsHeaders.getInstance(), jobMetricsHandler)); - handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler)); - handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler)); - handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler)); + handlers.add(Tuple2.of(clusterOverviewHandler.getMessageHeaders(), clusterOverviewHandler)); + handlers.add(Tuple2.of(clusterConfigurationHandler.getMessageHeaders(), clusterConfigurationHandler)); + handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), dashboardConfigHandler)); + handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), jobIdsHandler)); + handlers.add(Tuple2.of(jobsOverviewHandler.getMessageHeaders(), jobsOverviewHandler)); + handlers.add(Tuple2.of(jobConfigHandler.getMessageHeaders(), jobConfigHandler)); + handlers.add(Tuple2.of(checkpointConfigHandler.getMessageHeaders(), checkpointConfigHandler)); + handlers.add(Tuple2.of(checkpointStatisticsHandler.getMessageHeaders(), checkpointStatisticsHandler)); + handlers.add(Tuple2.of(checkpointStatisticDetailsHandler.getMessageHeaders(), checkpointStatisticDetailsHandler)); + handlers.add(Tuple2.of(jobPlanHandler.getMessageHeaders(), jobPlanHandler)); + handlers.add(Tuple2.of(taskCheckpointStatisticDetailsHandler.getMessageHeaders(), taskCheckpointStatisticDetailsHandler)); + handlers.add(Tuple2.of(jobExceptionsHandler.getMessageHeaders(), jobExceptionsHandler)); + handlers.add(Tuple2.of(jobVertexAccumulatorsHandler.getMessageHeaders(), jobVertexAccumulatorsHandler)); + handlers.add(Tuple2.of(subtasksAllAccumulatorsHandler.getMessageHeaders(), subtasksAllAccumulatorsHandler)); + handlers.add(Tuple2.of(jobDetailsHandler.getMessageHeaders(), jobDetailsHandler)); + handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(), jobAccumulatorsHandler)); + handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(), taskManagersHandler)); + handlers.add(Tuple2.of(taskManagerDetailsHandler.getMessageHeaders(), taskManagerDetailsHandler)); + handlers.add(Tuple2.of(subtasksTimesHandler.getMessageHeaders(), subtasksTimesHandler)); + handlers.add(Tuple2.of(jobVertexMetricsHandler.getMessageHeaders(), jobVertexMetricsHandler)); + handlers.add(Tuple2.of(jobMetricsHandler.getMessageHeaders(), jobMetricsHandler)); + handlers.add(Tuple2.of(subtaskMetricsHandler.getMessageHeaders(), subtaskMetricsHandler)); + handlers.add(Tuple2.of(taskManagerMetricsHandler.getMessageHeaders(), taskManagerMetricsHandler)); + handlers.add(Tuple2.of(jobManagerMetricsHandler.getMessageHeaders(), jobManagerMetricsHandler)); handlers.add(Tuple2.of(aggregatingTaskManagersMetricsHandler.getMessageHeaders(), aggregatingTaskManagersMetricsHandler)); handlers.add(Tuple2.of(aggregatingJobsMetricsHandler.getMessageHeaders(), aggregatingJobsMetricsHandler)); handlers.add(Tuple2.of(aggregatingSubtasksMetricsHandler.getMessageHeaders(), aggregatingSubtasksMetricsHandler)); - handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler)); - handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler)); - handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler)); - handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler)); - handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler)); - handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler)); - handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(), jobVertexTaskManagersHandler)); - handlers.add(Tuple2.of(JobVertexBackPressureHeaders.getInstance(), jobVertexBackPressureHandler)); - handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); - handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), jobVertexDetailsHandler)); - handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), rescalingTriggerHandler)); - handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), rescalingStatusHandler)); - handlers.add(Tuple2.of(SavepointDisposalTriggerHeaders.getInstance(), savepointDisposalTriggerHandler)); - handlers.add(Tuple2.of(SavepointDisposalStatusHeaders.getInstance(), savepointDisposalStatusHandler)); + handlers.add(Tuple2.of(jobExecutionResultHandler.getMessageHeaders(), jobExecutionResultHandler)); + handlers.add(Tuple2.of(savepointTriggerHandler.getMessageHeaders(), savepointTriggerHandler)); + handlers.add(Tuple2.of(savepointStatusHandler.getMessageHeaders(), savepointStatusHandler)); + handlers.add(Tuple2.of(subtaskExecutionAttemptDetailsHandler.getMessageHeaders(), subtaskExecutionAttemptDetailsHandler)); + handlers.add(Tuple2.of(subtaskExecutionAttemptAccumulatorsHandler.getMessageHeaders(), subtaskExecutionAttemptAccumulatorsHandler)); + handlers.add(Tuple2.of(subtaskCurrentAttemptDetailsHandler.getMessageHeaders(), subtaskCurrentAttemptDetailsHandler)); + handlers.add(Tuple2.of(jobVertexTaskManagersHandler.getMessageHeaders(), jobVertexTaskManagersHandler)); + handlers.add(Tuple2.of(jobVertexBackPressureHandler.getMessageHeaders(), jobVertexBackPressureHandler)); + handlers.add(Tuple2.of(jobCancelTerminationHandler.getMessageHeaders(), jobCancelTerminationHandler)); + handlers.add(Tuple2.of(jobVertexDetailsHandler.getMessageHeaders(), jobVertexDetailsHandler)); + handlers.add(Tuple2.of(rescalingTriggerHandler.getMessageHeaders(), rescalingTriggerHandler)); + handlers.add(Tuple2.of(rescalingStatusHandler.getMessageHeaders(), rescalingStatusHandler)); + handlers.add(Tuple2.of(savepointDisposalTriggerHandler.getMessageHeaders(), savepointDisposalTriggerHandler)); + handlers.add(Tuple2.of(savepointDisposalStatusHandler.getMessageHeaders(), savepointDisposalStatusHandler)); // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); - handlers.add(Tuple2.of(ShutdownHeaders.getInstance(), shutdownHandler)); + handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler)); optWebContent.ifPresent( webContent -> {
