[FLINK-4772] [metrics] Store metrics as strings in MetricStore

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30e7a61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30e7a61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30e7a61

Branch: refs/heads/master
Commit: e30e7a6192d921d7a3c94beb178eb2c7b9ba74c0
Parents: 3137bf7
Author: zentol <[email protected]>
Authored: Fri Oct 7 10:11:31 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 21 13:42:18 2016 +0200

----------------------------------------------------------------------
 .../metrics/AbstractMetricsHandler.java         |  6 ++--
 .../metrics/JobManagerMetricsHandler.java       |  2 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |  2 +-
 .../metrics/JobVertexMetricsHandler.java        |  2 +-
 .../runtime/webmonitor/metrics/MetricStore.java | 36 ++++++++++----------
 .../metrics/TaskManagerMetricsHandler.java      |  2 +-
 .../metrics/JobManagerMetricsHandlerTest.java   |  6 ++--
 .../metrics/JobMetricsHandlerTest.java          |  6 ++--
 .../metrics/JobVertexMetricsHandlerTest.java    |  8 ++---
 .../webmonitor/metrics/MetricFetcherTest.java   | 28 +++++++--------
 .../webmonitor/metrics/MetricStoreTest.java     | 10 +++---
 .../metrics/TaskManagerMetricsHandlerTest.java  |  6 ++--
 12 files changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 54e4b6f..8374523 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -61,7 +61,7 @@ public abstract class AbstractMetricsHandler implements 
RequestHandler {
         * @param metrics MetricStore containing all metrics
         * @return Map containing metrics, or null if no metric exists
         */
-       protected abstract Map<String, Object> getMapFor(Map<String, String> 
pathParams, MetricStore metrics);
+       protected abstract Map<String, String> getMapFor(Map<String, String> 
pathParams, MetricStore metrics);
 
        private String getMetricsValues(Map<String, String> pathParams, String 
requestedMetricsList) throws IOException {
                if (requestedMetricsList.isEmpty()) {
@@ -73,7 +73,7 @@ public abstract class AbstractMetricsHandler implements 
RequestHandler {
                }
                MetricStore metricStore = fetcher.getMetricStore();
                synchronized (metricStore) {
-                       Map<String, Object> metrics = getMapFor(pathParams, 
metricStore);
+                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
                        if (metrics == null) {
                                return "";
                        }
@@ -102,7 +102,7 @@ public abstract class AbstractMetricsHandler implements 
RequestHandler {
        private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
                MetricStore metricStore = fetcher.getMetricStore();
                synchronized (metricStore) {
-                       Map<String, Object> metrics = getMapFor(pathParams, 
metricStore);
+                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
                        if (metrics == null) {
                                return "";
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7435643..54d6aea 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -36,7 +36,7 @@ public class JobManagerMetricsHandler extends 
AbstractMetricsHandler {
        }
 
        @Override
-       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobManagerMetricStore jobManager = 
metrics.jobManager;
                if (jobManager == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index b54799d..cdaae2c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -38,7 +38,7 @@ public class JobMetricsHandler extends AbstractMetricsHandler 
{
        }
 
        @Override
-       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
                if (job == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 73b8bb0..1b92b47 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -38,7 +38,7 @@ public class JobVertexMetricsHandler extends 
AbstractMetricsHandler {
        }
 
        @Override
-       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
                if (job == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index 5df63c6..c1b2bec 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -129,11 +129,11 @@ public class MetricStore {
                }
        }
 
-       private void addMetric(Map<String, Object> target, String name, 
MetricDump metric) {
+       private void addMetric(Map<String, String> target, String name, 
MetricDump metric) {
                switch (metric.getCategory()) {
                        case METRIC_CATEGORY_COUNTER:
                                MetricDump.CounterDump counter = 
(MetricDump.CounterDump) metric;
-                               target.put(name, counter.count);
+                               target.put(name, String.valueOf(counter.count));
                                break;
                        case METRIC_CATEGORY_GAUGE:
                                MetricDump.GaugeDump gauge = 
(MetricDump.GaugeDump) metric;
@@ -141,21 +141,21 @@ public class MetricStore {
                                break;
                        case METRIC_CATEGORY_HISTOGRAM:
                                MetricDump.HistogramDump histogram = 
(MetricDump.HistogramDump) metric;
-                               target.put(name + "_min", histogram.min);
-                               target.put(name + "_max", histogram.max);
-                               target.put(name + "_mean", histogram.mean);
-                               target.put(name + "_median", histogram.median);
-                               target.put(name + "_stddev", histogram.stddev);
-                               target.put(name + "_p75", histogram.p75);
-                               target.put(name + "_p90", histogram.p90);
-                               target.put(name + "_p95", histogram.p95);
-                               target.put(name + "_p98", histogram.p98);
-                               target.put(name + "_p99", histogram.p99);
-                               target.put(name + "_p999", histogram.p999);
+                               target.put(name + "_min", 
String.valueOf(histogram.min));
+                               target.put(name + "_max", 
String.valueOf(histogram.max));
+                               target.put(name + "_mean", 
String.valueOf(histogram.mean));
+                               target.put(name + "_median", 
String.valueOf(histogram.median));
+                               target.put(name + "_stddev", 
String.valueOf(histogram.stddev));
+                               target.put(name + "_p75", 
String.valueOf(histogram.p75));
+                               target.put(name + "_p90", 
String.valueOf(histogram.p90));
+                               target.put(name + "_p95", 
String.valueOf(histogram.p95));
+                               target.put(name + "_p98", 
String.valueOf(histogram.p98));
+                               target.put(name + "_p99", 
String.valueOf(histogram.p99));
+                               target.put(name + "_p999", 
String.valueOf(histogram.p999));
                                break;
                        case METRIC_CATEGORY_METER:
                                MetricDump.MeterDump meter = 
(MetricDump.MeterDump) metric;
-                               target.put(name, meter.rate);
+                               target.put(name, String.valueOf(meter.rate));
                                break;
                }
        }
@@ -164,21 +164,21 @@ public class MetricStore {
         * Sub-structure containing metrics of the JobManager.
         */
        static class JobManagerMetricStore {
-               public final Map<String, Object> metrics = new HashMap<>();
+               public final Map<String, String> metrics = new HashMap<>();
        }
 
        /**
         * Sub-structure containing metrics of a single TaskManager.
         */
        static class TaskManagerMetricStore {
-               public final Map<String, Object> metrics = new HashMap<>();
+               public final Map<String, String> metrics = new HashMap<>();
        }
 
        /**
         * Sub-structure containing metrics of a single Job.
         */
        static class JobMetricStore {
-               public final Map<String, Object> metrics = new HashMap<>();
+               public final Map<String, String> metrics = new HashMap<>();
                public final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
        }
 
@@ -186,6 +186,6 @@ public class MetricStore {
         * Sub-structure containing metrics of a single Task.
         */
        static class TaskMetricStore {
-               public final Map<String, Object> metrics = new HashMap<>();
+               public final Map<String, String> metrics = new HashMap<>();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index fea3d07..e4e8b00 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -38,7 +38,7 @@ public class TaskManagerMetricsHandler extends 
AbstractMetricsHandler {
        }
 
        @Override
-       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
                MetricStore.TaskManagerMetricStore taskManager = 
metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
                if (taskManager == null) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 9757574..d0ffc81 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -40,9 +40,9 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
                Map<String, String> pathParams = new HashMap<>();
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
-               assertEquals(0L, metrics.get("abc.metric1"));
+               assertEquals("0", metrics.get("abc.metric1"));
        }
 
        @Test
@@ -54,7 +54,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
                Map<String, String> pathParams = new HashMap<>();
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
                assertNotNull(metrics);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index c0cc345..9391dc0 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -42,9 +42,9 @@ public class JobMetricsHandlerTest extends TestLogger {
                Map<String, String> pathParams = new HashMap<>();
                pathParams.put(PARAMETER_JOB_ID, "jobid");
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
-               assertEquals(2L, metrics.get("abc.metric3"));
+               assertEquals("2", metrics.get("abc.metric3"));
        }
 
        @Test
@@ -56,7 +56,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 
                Map<String, String> pathParams = new HashMap<>();
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
                assertNull(metrics);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index d6e5ca7..a7f9084 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -44,11 +44,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger 
{
                pathParams.put(PARAMETER_JOB_ID, "jobid");
                pathParams.put(PARAMETER_VERTEX_ID, "taskid");
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
-               assertEquals(3L, metrics.get("8.abc.metric4"));
+               assertEquals("3", metrics.get("8.abc.metric4"));
 
-               assertEquals(4L, metrics.get("8.opname.abc.metric5"));
+               assertEquals("4", metrics.get("8.opname.abc.metric5"));
        }
 
        @Test
@@ -60,7 +60,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
                Map<String, String> pathParams = new HashMap<>();
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
                assertNull(metrics);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 356ce67..14cbeac 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -144,22 +144,22 @@ public class MetricFetcherTest extends TestLogger {
                fetcher.update();
                MetricStore store = fetcher.getMetricStore();
                synchronized (store) {
-                       assertEquals(7L, 
store.jobManager.metrics.get("abc.hist_min"));
-                       assertEquals(6L, 
store.jobManager.metrics.get("abc.hist_max"));
-                       assertEquals(4.0, 
store.jobManager.metrics.get("abc.hist_mean"));
-                       assertEquals(0.5, 
store.jobManager.metrics.get("abc.hist_median"));
-                       assertEquals(5.0, 
store.jobManager.metrics.get("abc.hist_stddev"));
-                       assertEquals(0.75, 
store.jobManager.metrics.get("abc.hist_p75"));
-                       assertEquals(0.9, 
store.jobManager.metrics.get("abc.hist_p90"));
-                       assertEquals(0.95, 
store.jobManager.metrics.get("abc.hist_p95"));
-                       assertEquals(0.98, 
store.jobManager.metrics.get("abc.hist_p98"));
-                       assertEquals(0.99, 
store.jobManager.metrics.get("abc.hist_p99"));
-                       assertEquals(0.999, 
store.jobManager.metrics.get("abc.hist_p999"));
+                       assertEquals("7", 
store.jobManager.metrics.get("abc.hist_min"));
+                       assertEquals("6", 
store.jobManager.metrics.get("abc.hist_max"));
+                       assertEquals("4.0", 
store.jobManager.metrics.get("abc.hist_mean"));
+                       assertEquals("0.5", 
store.jobManager.metrics.get("abc.hist_median"));
+                       assertEquals("5.0", 
store.jobManager.metrics.get("abc.hist_stddev"));
+                       assertEquals("0.75", 
store.jobManager.metrics.get("abc.hist_p75"));
+                       assertEquals("0.9", 
store.jobManager.metrics.get("abc.hist_p90"));
+                       assertEquals("0.95", 
store.jobManager.metrics.get("abc.hist_p95"));
+                       assertEquals("0.98", 
store.jobManager.metrics.get("abc.hist_p98"));
+                       assertEquals("0.99", 
store.jobManager.metrics.get("abc.hist_p99"));
+                       assertEquals("0.999", 
store.jobManager.metrics.get("abc.hist_p999"));
 
                        assertEquals("x", 
store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
-                       assertEquals(5.0, 
store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
-                       assertEquals(2L, 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
-                       assertEquals(1L, 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+                       assertEquals("5.0", 
store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
+                       assertEquals("2", 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
+                       assertEquals("1", 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
index 9dc2929..ee46494 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger {
        public void testAdd() throws IOException {
                MetricStore store = setupStore(new MetricStore());
 
-               assertEquals(0L, store.jobManager.metrics.get("abc.metric1"));
-               assertEquals(1L, 
store.taskManagers.get("tmid").metrics.get("abc.metric2"));
-               assertEquals(2L, 
store.jobs.get("jobid").metrics.get("abc.metric3"));
-               assertEquals(3L, 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
-               assertEquals(4L, 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+               assertEquals("0", store.jobManager.metrics.get("abc.metric1"));
+               assertEquals("1", 
store.taskManagers.get("tmid").metrics.get("abc.metric2"));
+               assertEquals("2", 
store.jobs.get("jobid").metrics.get("abc.metric3"));
+               assertEquals("3", 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
+               assertEquals("4", 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index 6299a56..a410404 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -42,9 +42,9 @@ public class TaskManagerMetricsHandlerTest extends TestLogger 
{
                Map<String, String> pathParams = new HashMap<>();
                pathParams.put(PARAMETER_TM_ID, "tmid");
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
-               assertEquals(1L, metrics.get("abc.metric2"));
+               assertEquals("1", metrics.get("abc.metric2"));
        }
 
        @Test
@@ -56,7 +56,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger 
{
 
                Map<String, String> pathParams = new HashMap<>();
 
-               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
 
                assertNull(metrics);
        }

Reply via email to