Repository: flink Updated Branches: refs/heads/master 3137bf774 -> 0d2903541
[FLINK-4775] [metrics] Simplify MetricStore access Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d290354 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d290354 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d290354 Branch: refs/heads/master Commit: 0d290354179a5ea3a11040a2ed7e218263bc474b Parents: e30e7a6 Author: zentol <[email protected]> Authored: Fri Oct 7 10:16:49 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 21 13:42:18 2016 +0200 ---------------------------------------------------------------------- .../metrics/JobManagerMetricsHandler.java | 2 +- .../webmonitor/metrics/JobMetricsHandler.java | 10 +- .../metrics/JobVertexMetricsHandler.java | 17 +-- .../runtime/webmonitor/metrics/MetricStore.java | 125 +++++++++++++++++-- .../metrics/TaskManagerMetricsHandler.java | 2 +- .../webmonitor/metrics/MetricFetcherTest.java | 8 +- .../webmonitor/metrics/MetricStoreTest.java | 10 +- 7 files changed, 134 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java index 54d6aea..7452c71 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java @@ -37,7 +37,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobManagerMetricStore jobManager = metrics.jobManager; + MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); if (jobManager == null) { return null; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java index cdaae2c..d66c954 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java @@ -39,11 +39,9 @@ public class JobMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID)); - if (job == null) { - return null; - } else { - return job.metrics; - } + MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); + return job != null + ? job.metrics + : null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java index 1b92b47..6fca771 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java @@ -39,16 +39,11 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID)); - if (job == null) { - return null; - } else { - MetricStore.TaskMetricStore task = job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID)); - if (task == null) { - return null; - } else { - return task.metrics; - } - } + MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( + pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), + pathParams.get(PARAMETER_VERTEX_ID)); + return task != null + ? task.metrics + : null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java index c1b2bec..989145b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java @@ -47,17 +47,21 @@ public class MetricStore { final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); final Map<String, JobMetricStore> jobs = new HashMap<>(); + // ----------------------------------------------------------------------------------------------------------------- + // Adding metrics + // ----------------------------------------------------------------------------------------------------------------- public void add(MetricDump metric) { try { QueryScopeInfo info = metric.scopeInfo; TaskManagerMetricStore tm; JobMetricStore job; TaskMetricStore task; + SubtaskMetricStore subtask; String name = info.scope.isEmpty() ? metric.name : info.scope + "." + metric.name; - + if (name.isEmpty()) { // malformed transmission return; } @@ -96,10 +100,18 @@ public class MetricStore { task = new TaskMetricStore(); job.tasks.put(taskInfo.vertexID, task); } + subtask = task.subtasks.get(taskInfo.subtaskIndex); + if (subtask == null) { + subtask = new SubtaskMetricStore(); + task.subtasks.put(taskInfo.subtaskIndex, subtask); + } /** - * As the WebInterface task metric queries currently do not account for subtasks we don't - * divide by subtask and instead use the concatenation of subtask index and metric name as the name. + * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers, + * while the WebInterface task metric queries currently do not account for subtasks, so we don't + * divide by subtask and instead use the concatenation of subtask index and metric name as the name + * for thos. */ + addMetric(subtask.metrics, name, metric); addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); break; case INFO_CATEGORY_OPERATOR: @@ -160,32 +172,121 @@ public class MetricStore { } } + // ----------------------------------------------------------------------------------------------------------------- + // Accessors for sub MetricStores + // ----------------------------------------------------------------------------------------------------------------- + /** - * Sub-structure containing metrics of the JobManager. + * Returns the {@link JobManagerMetricStore}. + * + * @return JobManagerMetricStore + */ + public JobManagerMetricStore getJobManagerMetricStore() { + return jobManager; + } + + /** + * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. + * + * @param tmID taskmanager ID + * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists + */ + public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { + return taskManagers.get(tmID); + } + + /** + * Returns the {@link JobMetricStore} for the given job ID. + * + * @param jobID job ID + * @return JobMetricStore for the given ID, or null if no store for the given argument exists + */ + public JobMetricStore getJobMetricStore(String jobID) { + return jobs.get(jobID); + } + + /** + * Returns the {@link TaskMetricStore} for the given job/task ID. + * + * @param jobID job ID + * @param taskID task ID + * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists + */ + public TaskMetricStore getTaskMetricStore(String jobID, String taskID) { + JobMetricStore job = getJobMetricStore(jobID); + if (job == null) { + return null; + } + return job.getTaskMetricStore(taskID); + } + + /** + * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index. + * + * @param jobID job ID + * @param taskID task ID + * @param subtaskIndex subtask index + * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists */ - static class JobManagerMetricStore { + public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { + TaskMetricStore task = getTaskMetricStore(jobID, taskID); + if (task == null) { + return null; + } + return task.getSubtaskMetricStore(subtaskIndex); + } + + // ----------------------------------------------------------------------------------------------------------------- + // sub MetricStore classes + // ----------------------------------------------------------------------------------------------------------------- + private static abstract class ComponentMetricStore { public final Map<String, String> metrics = new HashMap<>(); + + public String getMetric(String name, String defaultValue) { + String value = this.metrics.get(name); + return value != null + ? value + : defaultValue; + } + } + + /** + * Sub-structure containing metrics of the JobManager. + */ + public static class JobManagerMetricStore extends ComponentMetricStore { } /** * Sub-structure containing metrics of a single TaskManager. */ - static class TaskManagerMetricStore { - public final Map<String, String> metrics = new HashMap<>(); + public static class TaskManagerMetricStore extends ComponentMetricStore { } /** * Sub-structure containing metrics of a single Job. */ - static class JobMetricStore { - public final Map<String, String> metrics = new HashMap<>(); - public final Map<String, TaskMetricStore> tasks = new HashMap<>(); + public static class JobMetricStore extends ComponentMetricStore { + private final Map<String, TaskMetricStore> tasks = new HashMap<>(); + + public TaskMetricStore getTaskMetricStore(String taskID) { + return tasks.get(taskID); + } } /** * Sub-structure containing metrics of a single Task. */ - static class TaskMetricStore { - public final Map<String, String> metrics = new HashMap<>(); + public static class TaskMetricStore extends ComponentMetricStore { + private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>(); + + public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { + return subtasks.get(subtaskIndex); + } + } + + /** + * Sub-structure containing metrics of a single Subtask. + */ + public static class SubtaskMetricStore extends ComponentMetricStore { } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java index e4e8b00..a69b676 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java @@ -39,7 +39,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID)); + MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(PARAMETER_TM_ID)); if (taskManager == null) { return null; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java index 14cbeac..3061346 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -156,10 +156,10 @@ public class MetricFetcherTest extends TestLogger { assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99")); assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999")); - assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge")); - assertEquals("5.0", store.jobs.get(jobID.toString()).metrics.get("abc.jc")); - assertEquals("2", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc")); - assertEquals("1", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc")); + assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge")); + assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc")); + assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc")); + assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java index ee46494..c71f015 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java @@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger { public void testAdd() throws IOException { MetricStore store = setupStore(new MetricStore()); - assertEquals("0", store.jobManager.metrics.get("abc.metric1")); - assertEquals("1", store.taskManagers.get("tmid").metrics.get("abc.metric2")); - assertEquals("2", store.jobs.get("jobid").metrics.get("abc.metric3")); - assertEquals("3", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4")); - assertEquals("4", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5")); + assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")); + assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1")); + assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1")); + assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1")); + assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1")); } @Test
