Repository: flink
Updated Branches:
  refs/heads/master 3bc9cad04 -> a582882da


[FLINK-4925] [metrics] Integrate meters into IOMetricGroups

This closes #2694.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a582882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a582882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a582882d

Branch: refs/heads/master
Commit: a582882dae125da2c4cef249428371c37fd64c53
Parents: 3bc9cad
Author: zentol <[email protected]>
Authored: Wed Oct 26 13:14:03 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 28 11:11:00 2016 +0200

----------------------------------------------------------------------
 .../metrics/groups/OperatorIOMetricGroup.java   | 15 ++++++++++++++
 .../metrics/groups/TaskIOMetricGroup.java       | 21 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 8a69029..32611fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -18,6 +18,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -28,10 +30,15 @@ public class OperatorIOMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
        private final Counter numRecordsIn;
        private final Counter numRecordsOut;
 
+       private final Meter numRecordsInRate;
+       private final Meter numRecordsOutRate;
+
        public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
                super(parentMetricGroup);
                numRecordsIn = parentMetricGroup.counter("numRecordsIn");
                numRecordsOut = parentMetricGroup.counter("numRecordsOut");
+               numRecordsInRate = 
parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 
60));
+               numRecordsOutRate = 
parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 
60));
        }
 
        public Counter getNumRecordsInCounter() {
@@ -41,4 +48,12 @@ public class OperatorIOMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
        public Counter getNumRecordsOutCounter() {
                return numRecordsOut;
        }
+
+       public Meter getNumRecordsInRateMeter() {
+               return numRecordsInRate;
+       }
+
+       public Meter getNumRecordsOutRate() {
+               return numRecordsOutRate;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index a726c26..ab7ceb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -30,12 +32,19 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        private final Counter numBytesInLocal;
        private final Counter numBytesInRemote;
 
+       private final Meter numBytesInRateLocal;
+       private final Meter numBytesInRateRemote;
+       private final Meter numBytesOutRate;
+
        public TaskIOMetricGroup(TaskMetricGroup parent) {
                super(parent);
 
                this.numBytesOut = counter("numBytesOut");
                this.numBytesInLocal = counter("numBytesInLocal");
                this.numBytesInRemote = counter("numBytesInRemote");
+               this.numBytesOutRate = meter("numBytesOutPerSecond", new 
MeterView(numBytesOut, 60));
+               this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", 
new MeterView(numBytesInLocal, 60));
+               this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", 
new MeterView(numBytesInRemote, 60));
        }
 
        public Counter getNumBytesOutCounter() {
@@ -49,4 +58,16 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        public Counter getNumBytesInRemoteCounter() {
                return numBytesInRemote;
        }
+
+       public Meter getNumBytesInRateLocalMeter() {
+               return numBytesInRateLocal;
+       }
+
+       public Meter getNumBytesInRateRemoteMeter() {
+               return numBytesInRateRemote;
+       }
+
+       public Meter getNumBytesOutRateMeter() {
+               return numBytesOutRate;
+       }
 }

Reply via email to