Repository: flink Updated Branches: refs/heads/master 3bc9cad04 -> a582882da
[FLINK-4925] [metrics] Integrate meters into IOMetricGroups This closes #2694. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a582882d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a582882d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a582882d Branch: refs/heads/master Commit: a582882dae125da2c4cef249428371c37fd64c53 Parents: 3bc9cad Author: zentol <[email protected]> Authored: Wed Oct 26 13:14:03 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 28 11:11:00 2016 +0200 ---------------------------------------------------------------------- .../metrics/groups/OperatorIOMetricGroup.java | 15 ++++++++++++++ .../metrics/groups/TaskIOMetricGroup.java | 21 ++++++++++++++++++++ 2 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java index 8a69029..32611fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -28,10 +30,15 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> private final Counter numRecordsIn; private final Counter numRecordsOut; + private final Meter numRecordsInRate; + private final Meter numRecordsOutRate; + public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) { super(parentMetricGroup); numRecordsIn = parentMetricGroup.counter("numRecordsIn"); numRecordsOut = parentMetricGroup.counter("numRecordsOut"); + numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60)); + numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60)); } public Counter getNumRecordsInCounter() { @@ -41,4 +48,12 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> public Counter getNumRecordsOutCounter() { return numRecordsOut; } + + public Meter getNumRecordsInRateMeter() { + return numRecordsInRate; + } + + public Meter getNumRecordsOutRate() { + return numRecordsOutRate; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index a726c26..ab7ceb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -30,12 +32,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Counter numBytesInLocal; private final Counter numBytesInRemote; + private final Meter numBytesInRateLocal; + private final Meter numBytesInRateRemote; + private final Meter numBytesOutRate; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); this.numBytesOut = counter("numBytesOut"); this.numBytesInLocal = counter("numBytesInLocal"); this.numBytesInRemote = counter("numBytesInRemote"); + this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60)); + this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60)); + this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60)); } public Counter getNumBytesOutCounter() { @@ -49,4 +58,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { public Counter getNumBytesInRemoteCounter() { return numBytesInRemote; } + + public Meter getNumBytesInRateLocalMeter() { + return numBytesInRateLocal; + } + + public Meter getNumBytesInRateRemoteMeter() { + return numBytesInRateRemote; + } + + public Meter getNumBytesOutRateMeter() { + return numBytesOutRate; + } }
