Repository: flink
Updated Branches:
  refs/heads/master 365cd987c -> 47db9cb1a


[FLINK-4906] [metrics] Introduce constants for IO metrics

This closes #2980.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5dd510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5dd510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5dd510

Branch: refs/heads/master
Commit: dc5dd5106738e393761a62a56d9e684c722c516f
Parents: 4befbb8
Author: zentol <[email protected]>
Authored: Fri Dec 9 14:10:10 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Dec 12 12:45:26 2016 +0100

----------------------------------------------------------------------
 .../webmonitor/handlers/JobDetailsHandler.java  |  9 ++---
 .../handlers/JobVertexDetailsHandler.java       |  9 ++---
 .../handlers/JobVertexTaskManagersHandler.java  |  9 ++---
 .../SubtaskExecutionAttemptDetailsHandler.java  |  9 ++---
 .../flink/runtime/metrics/MetricNames.java      | 38 ++++++++++++++++++++
 .../metrics/groups/OperatorIOMetricGroup.java   |  9 ++---
 .../metrics/groups/TaskIOMetricGroup.java       | 21 +++++------
 7 files changed, 74 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6de6dc5..35e6ca7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
@@ -161,10 +162,10 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                                        fetcher.update();
                                        MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), 
ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
                                        if (metrics != null) {
-                                               numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-                                               numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-                                               numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-                                               numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                                               numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                               numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                               numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                               numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 14dcd39..32626ba 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -99,10 +100,10 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                                fetcher.update();
                                MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
                                if (metrics != null) {
-                                       numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-                                       numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-                                       numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-                                       numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                                       numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                       numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                       numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                       numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index c1fabf8..f468d35 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -120,10 +121,10 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                                        fetcher.update();
                                        MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), vertex.getParallelSubtaskIndex());
                                        if (metrics != null) {
-                                               numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-                                               numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-                                               numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-                                               numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                                               numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                               numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                               numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                               numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index ca9c7ad..da8db02 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -86,10 +87,10 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                        fetcher.update();
                        MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
                        if (metrics != null) {
-                               numBytesIn = 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-                               numBytesOut = 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-                               numRecordsIn = 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-                               numRecordsOut = 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                               numBytesIn = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                               numBytesOut = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                               numRecordsIn = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                               numRecordsOut = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
new file mode 100644
index 0000000..9202ca1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+public class MetricNames {
+       private MetricNames() {
+       }
+
+       private static final String SUFFIX_RATE = "PerSecond";
+
+       public static final String IO_NUM_RECORDS_IN = "numRecordsIn";
+       public static final String IO_NUM_RECORDS_OUT = "numRecordsOut";
+       public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + 
SUFFIX_RATE;
+       public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT 
+ SUFFIX_RATE;
+
+       public static final String IO_NUM_BYTES_IN = "numBytesIn";
+       public static final String IO_NUM_BYTES_IN_LOCAL = IO_NUM_BYTES_IN + 
"Local";
+       public static final String IO_NUM_BYTES_IN_REMOTE = IO_NUM_BYTES_IN + 
"Remote";
+       public static final String IO_NUM_BYTES_OUT = "numBytesOut";
+       public static final String IO_NUM_BYTES_IN_LOCAL_RATE = 
IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE;
+       public static final String IO_NUM_BYTES_IN_REMOTE_RATE = 
IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
+       public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + 
SUFFIX_RATE;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 2e321fe..5bf7d1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.metrics.MetricNames;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -35,10 +36,10 @@ public class OperatorIOMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
 
        public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
                super(parentMetricGroup);
-               numRecordsIn = parentMetricGroup.counter("numRecordsIn");
-               numRecordsOut = parentMetricGroup.counter("numRecordsOut");
-               numRecordsInRate = 
parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 
60));
-               numRecordsOutRate = 
parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 
60));
+               numRecordsIn = 
parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_IN);
+               numRecordsOut = 
parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT);
+               numRecordsInRate = 
parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new 
MeterView(numRecordsIn, 60));
+               numRecordsOutRate = 
parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new 
MeterView(numRecordsOut, 60));
        }
 
        public Counter getNumRecordsInCounter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fb..fcea098 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 
@@ -53,16 +54,16 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        public TaskIOMetricGroup(TaskMetricGroup parent) {
                super(parent);
 
-               this.numBytesOut = counter("numBytesOut");
-               this.numBytesInLocal = counter("numBytesInLocal");
-               this.numBytesInRemote = counter("numBytesInRemote");
-               this.numBytesOutRate = meter("numBytesOutPerSecond", new 
MeterView(numBytesOut, 60));
-               this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", 
new MeterView(numBytesInLocal, 60));
-               this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", 
new MeterView(numBytesInRemote, 60));
-               this.numRecordsIn = counter("numRecordsIn", new SumCounter());
-               this.numRecordsOut = counter("numRecordsOut", new SumCounter());
-               this.numRecordsInRate = meter("numRecordsInPerSecond", new 
MeterView(numRecordsIn, 60));
-               this.numRecordsOutRate = meter("numRecordsOutPerSecond", new 
MeterView(numRecordsOut, 60));
+               this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT);
+               this.numBytesInLocal = 
counter(MetricNames.IO_NUM_BYTES_IN_LOCAL);
+               this.numBytesInRemote = 
counter(MetricNames.IO_NUM_BYTES_IN_REMOTE);
+               this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, 
new MeterView(numBytesOut, 60));
+               this.numBytesInRateLocal = 
meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 
60));
+               this.numBytesInRateRemote = 
meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 
60));
+               this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new 
SumCounter());
+               this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, 
new SumCounter());
+               this.numRecordsInRate = 
meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
+               this.numRecordsOutRate = 
meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
        }
 
        public IOMetrics createSnapshot() {

Reply via email to