Repository: flink
Updated Branches:
  refs/heads/master 3137bf774 -> 0d2903541


[FLINK-4775] [metrics] Simplify MetricStore access


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d290354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d290354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d290354

Branch: refs/heads/master
Commit: 0d290354179a5ea3a11040a2ed7e218263bc474b
Parents: e30e7a6
Author: zentol <[email protected]>
Authored: Fri Oct 7 10:16:49 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 21 13:42:18 2016 +0200

----------------------------------------------------------------------
 .../metrics/JobManagerMetricsHandler.java       |   2 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |  10 +-
 .../metrics/JobVertexMetricsHandler.java        |  17 +--
 .../runtime/webmonitor/metrics/MetricStore.java | 125 +++++++++++++++++--
 .../metrics/TaskManagerMetricsHandler.java      |   2 +-
 .../webmonitor/metrics/MetricFetcherTest.java   |   8 +-
 .../webmonitor/metrics/MetricStoreTest.java     |  10 +-
 7 files changed, 134 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 54d6aea..7452c71 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -37,7 +37,7 @@ public class JobManagerMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobManagerMetricStore jobManager = 
metrics.jobManager;
+               MetricStore.JobManagerMetricStore jobManager = 
metrics.getJobManagerMetricStore();
                if (jobManager == null) {
                        return null;
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index cdaae2c..d66c954 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -39,11 +39,9 @@ public class JobMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
-               if (job == null) {
-                       return null;
-               } else {
-                       return job.metrics;
-               }
+               MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+               return job != null
+                       ? job.metrics
+                       : null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 1b92b47..6fca771 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -39,16 +39,11 @@ public class JobVertexMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
-               if (job == null) {
-                       return null;
-               } else {
-                       MetricStore.TaskMetricStore task = 
job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID));
-                       if (task == null) {
-                               return null;
-                       } else {
-                               return task.metrics;
-                       }
-               }
+               MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+                       pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+                       pathParams.get(PARAMETER_VERTEX_ID));
+               return task != null
+                       ? task.metrics
+                       : null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index c1b2bec..989145b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -47,17 +47,21 @@ public class MetricStore {
        final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
        final Map<String, JobMetricStore> jobs = new HashMap<>();
 
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Adding metrics
+       // 
-----------------------------------------------------------------------------------------------------------------
        public void add(MetricDump metric) {
                try {
                        QueryScopeInfo info = metric.scopeInfo;
                        TaskManagerMetricStore tm;
                        JobMetricStore job;
                        TaskMetricStore task;
+                       SubtaskMetricStore subtask;
 
                        String name = info.scope.isEmpty()
                                ? metric.name
                                : info.scope + "." + metric.name;
-                       
+
                        if (name.isEmpty()) { // malformed transmission
                                return;
                        }
@@ -96,10 +100,18 @@ public class MetricStore {
                                                task = new TaskMetricStore();
                                                
job.tasks.put(taskInfo.vertexID, task);
                                        }
+                                       subtask = 
task.subtasks.get(taskInfo.subtaskIndex);
+                                       if (subtask == null) {
+                                               subtask = new 
SubtaskMetricStore();
+                                               
task.subtasks.put(taskInfo.subtaskIndex, subtask);
+                                       }
                                        /**
-                                        * As the WebInterface task metric 
queries currently do not account for subtasks we don't 
-                                        * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name. 
+                                        * The duplication is intended. Metrics 
scoped by subtask are useful for several job/task handlers,
+                                        * while the WebInterface task metric 
queries currently do not account for subtasks, so we don't 
+                                        * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name
+                                        * for thos.
                                         */
+                                       addMetric(subtask.metrics, name, 
metric);
                                        addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
                                        break;
                                case INFO_CATEGORY_OPERATOR:
@@ -160,32 +172,121 @@ public class MetricStore {
                }
        }
 
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Accessors for sub MetricStores
+       // 
-----------------------------------------------------------------------------------------------------------------
+
        /**
-        * Sub-structure containing metrics of the JobManager.
+        * Returns the {@link JobManagerMetricStore}.
+        *
+        * @return JobManagerMetricStore
+        */
+       public JobManagerMetricStore getJobManagerMetricStore() {
+               return jobManager;
+       }
+
+       /**
+        * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
ID.
+        *
+        * @param tmID taskmanager ID
+        * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
+        */
+       public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+               return taskManagers.get(tmID);
+       }
+
+       /**
+        * Returns the {@link JobMetricStore} for the given job ID.
+        *
+        * @param jobID job ID
+        * @return JobMetricStore for the given ID, or null if no store for the 
given argument exists
+        */
+       public JobMetricStore getJobMetricStore(String jobID) {
+               return jobs.get(jobID);
+       }
+
+       /**
+        * Returns the {@link TaskMetricStore} for the given job/task ID.
+        *
+        * @param jobID  job ID
+        * @param taskID task ID
+        * @return TaskMetricStore for given IDs, or null if no store for the 
given arguments exists
+        */
+       public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+               JobMetricStore job = getJobMetricStore(jobID);
+               if (job == null) {
+                       return null;
+               }
+               return job.getTaskMetricStore(taskID);
+       }
+
+       /**
+        * Returns the {@link SubtaskMetricStore} for the given job/task ID and 
subtask index.
+        *
+        * @param jobID        job ID
+        * @param taskID       task ID
+        * @param subtaskIndex subtask index
+        * @return SubtaskMetricStore for the given IDs and index, or null if 
no store for the given arguments exists
         */
-       static class JobManagerMetricStore {
+       public SubtaskMetricStore getSubtaskMetricStore(String jobID, String 
taskID, int subtaskIndex) {
+               TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+               if (task == null) {
+                       return null;
+               }
+               return task.getSubtaskMetricStore(subtaskIndex);
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // sub MetricStore classes
+       // 
-----------------------------------------------------------------------------------------------------------------
+       private static abstract class ComponentMetricStore {
                public final Map<String, String> metrics = new HashMap<>();
+
+               public String getMetric(String name, String defaultValue) {
+                       String value = this.metrics.get(name);
+                       return value != null
+                               ? value
+                               : defaultValue;
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of the JobManager.
+        */
+       public static class JobManagerMetricStore extends ComponentMetricStore {
        }
 
        /**
         * Sub-structure containing metrics of a single TaskManager.
         */
-       static class TaskManagerMetricStore {
-               public final Map<String, String> metrics = new HashMap<>();
+       public static class TaskManagerMetricStore extends ComponentMetricStore 
{
        }
 
        /**
         * Sub-structure containing metrics of a single Job.
         */
-       static class JobMetricStore {
-               public final Map<String, String> metrics = new HashMap<>();
-               public final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
+       public static class JobMetricStore extends ComponentMetricStore {
+               private final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
+
+               public TaskMetricStore getTaskMetricStore(String taskID) {
+                       return tasks.get(taskID);
+               }
        }
 
        /**
         * Sub-structure containing metrics of a single Task.
         */
-       static class TaskMetricStore {
-               public final Map<String, String> metrics = new HashMap<>();
+       public static class TaskMetricStore extends ComponentMetricStore {
+               private final Map<Integer, SubtaskMetricStore> subtasks = new 
HashMap<>();
+
+               public SubtaskMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
+                       return subtasks.get(subtaskIndex);
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Subtask.
+        */
+       public static class SubtaskMetricStore extends ComponentMetricStore {
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index e4e8b00..a69b676 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -39,7 +39,7 @@ public class TaskManagerMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.TaskManagerMetricStore taskManager = 
metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
+               MetricStore.TaskManagerMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(PARAMETER_TM_ID));
                if (taskManager == null) {
                        return null;
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 14cbeac..3061346 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -156,10 +156,10 @@ public class MetricFetcherTest extends TestLogger {
                        assertEquals("0.99", 
store.jobManager.metrics.get("abc.hist_p99"));
                        assertEquals("0.999", 
store.jobManager.metrics.get("abc.hist_p999"));
 
-                       assertEquals("x", 
store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
-                       assertEquals("5.0", 
store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
-                       assertEquals("2", 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
-                       assertEquals("1", 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+                       assertEquals("x", 
store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
+                       assertEquals("5.0", 
store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
+                       assertEquals("2", 
store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
+                       assertEquals("1", 
store.getTaskMetricStore(jobID.toString(), 
"taskid").metrics.get("2.opname.abc.oc"));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
index ee46494..c71f015 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger {
        public void testAdd() throws IOException {
                MetricStore store = setupStore(new MetricStore());
 
-               assertEquals("0", store.jobManager.metrics.get("abc.metric1"));
-               assertEquals("1", 
store.taskManagers.get("tmid").metrics.get("abc.metric2"));
-               assertEquals("2", 
store.jobs.get("jobid").metrics.get("abc.metric3"));
-               assertEquals("3", 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
-               assertEquals("4", 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+               assertEquals("0", 
store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
+               assertEquals("1", 
store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
+               assertEquals("2", 
store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
+               assertEquals("3", store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.abc.metric4", "-1"));
+               assertEquals("4", store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.opname.abc.metric5", "-1"));
        }
 
        @Test

Reply via email to