This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new b954364  [FLINK-10022][network][metrics] add metrics for input/output 
buffers (#6551)
b954364 is described below

commit b954364b6a2a8bc00286bab919433885008c7ce0
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Aug 14 21:50:58 2018 +0200

    [FLINK-10022][network][metrics] add metrics for input/output buffers (#6551)
    
    [FLINK-10022][network][metrics] add metrics for input/output buffers
    
    This closes #6551.
---
 docs/monitoring/metrics.md                         | 31 ++++++++++++++++++++++
 .../io/network/api/writer/RecordWriter.java        |  4 +++
 .../network/partition/consumer/InputChannel.java   |  6 ++++-
 .../partition/consumer/LocalInputChannel.java      |  3 ++-
 .../partition/consumer/RemoteInputChannel.java     |  3 ++-
 .../partition/consumer/UnknownInputChannel.java    |  2 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |  8 ++++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 26 ++++++++++++++++++
 .../partition/consumer/InputChannelTest.java       |  2 +-
 .../partition/consumer/TestInputChannel.java       |  2 +-
 .../metrics/groups/TaskIOMetricGroupTest.java      |  6 +++++
 11 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index bf0aee1..bac40f0 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1218,6 +1218,27 @@ Thus, in order to infer the metric identifier:
       <td>Meter</td>
     </tr>
     <tr>
+      <th rowspan="6"><strong>Task</strong></th>
+      <td>numBuffersInLocal</td>
+      <td>The total number of network buffers this task has read from a local 
source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInLocalPerSecond</td>
+      <td>The number of network buffers this task reads from a local source 
per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemote</td>
+      <td>The total number of network buffers this task has read from a remote 
source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemotePerSecond</td>
+      <td>The number of network buffers this task reads from a remote source 
per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <td>numBytesOut</td>
       <td>The total number of bytes this task has emitted.</td>
       <td>Counter</td>
@@ -1228,6 +1249,16 @@ Thus, in order to infer the metric identifier:
       <td>Meter</td>
     </tr>
     <tr>
+      <td>numBuffersOut</td>
+      <td>The total number of network buffers this task has emitted.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersOutPerSecond</td>
+      <td>The number of network buffers this task emits per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
       <td>The total number of records this operator/task has received.</td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49..970795c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 
        private Counter numBytesOut = new SimpleCounter();
 
+       private Counter numBuffersOut = new SimpleCounter();
+
        public RecordWriter(ResultPartitionWriter writer) {
                this(writer, new RoundRobinChannelSelector<T>());
        }
@@ -184,6 +186,7 @@ public class RecordWriter<T extends IOReadableWritable> {
      */
        public void setMetricGroup(TaskIOMetricGroup metrics) {
                numBytesOut = metrics.getNumBytesOutCounter();
+               numBuffersOut = metrics.getNumBuffersOutCounter();
        }
 
        /**
@@ -200,6 +203,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                bufferBuilders[targetChannel] = Optional.empty();
 
                numBytesOut.inc(bufferBuilder.finish());
+               numBuffersOut.inc();
                serializer.clear();
                return true;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce5866..2a7cedf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@ public abstract class InputChannel {
 
        protected final Counter numBytesIn;
 
+       protected final Counter numBuffersIn;
+
        /** The current backoff (in ms) */
        private int currentBackoff;
 
@@ -73,7 +75,8 @@ public abstract class InputChannel {
                        ResultPartitionID partitionId,
                        int initialBackoff,
                        int maxBackoff,
-                       Counter numBytesIn) {
+                       Counter numBytesIn,
+                       Counter numBuffersIn) {
 
                checkArgument(channelIndex >= 0);
 
@@ -91,6 +94,7 @@ public abstract class InputChannel {
                this.currentBackoff = initial == 0 ? -1 : 0;
 
                this.numBytesIn = numBytesIn;
+               this.numBuffersIn = numBuffersIn;
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f9c75ad..4b3a8ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -84,7 +84,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                int maxBackoff,
                TaskIOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, initialBackoff, 
maxBackoff, metrics.getNumBytesInLocalCounter());
+               super(inputGate, channelIndex, partitionId, initialBackoff, 
maxBackoff, metrics.getNumBytesInLocalCounter(), 
metrics.getNumBuffersInLocalCounter());
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -194,6 +194,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                }
 
                numBytesIn.inc(next.buffer().getSizeUnsafe());
+               numBuffersIn.inc();
                return Optional.of(new BufferAndAvailability(next.buffer(), 
next.isMoreAvailable(), next.buffersInBacklog()));
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b94f48a..28f3020 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -123,7 +123,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                int maxBackoff,
                TaskIOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, initialBackOff, 
maxBackoff, metrics.getNumBytesInRemoteCounter());
+               super(inputGate, channelIndex, partitionId, initialBackOff, 
maxBackoff, metrics.getNumBytesInRemoteCounter(), 
metrics.getNumBuffersInRemoteCounter());
 
                this.connectionId = checkNotNull(connectionId);
                this.connectionManager = checkNotNull(connectionManager);
@@ -199,6 +199,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                }
 
                numBytesIn.inc(next.getSizeUnsafe());
+               numBuffersIn.inc();
                return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 1101f66..20a7aed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -61,7 +61,7 @@ class UnknownInputChannel extends InputChannel {
                        int maxBackoff,
                        TaskIOMetricGroup metrics) {
 
-               super(gate, channelIndex, partitionId, initialBackoff, 
maxBackoff, null);
+               super(gate, channelIndex, partitionId, initialBackoff, 
maxBackoff, null, null);
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index d15a0f1..c106c39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -40,6 +40,14 @@ public class MetricNames {
        public static final String IO_NUM_BYTES_IN_REMOTE_RATE = 
IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
        public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + 
SUFFIX_RATE;
 
+       public static final String IO_NUM_BUFFERS_IN = "numBuffersIn";
+       public static final String IO_NUM_BUFFERS_IN_LOCAL = IO_NUM_BUFFERS_IN 
+ "Local";
+       public static final String IO_NUM_BUFFERS_IN_REMOTE = IO_NUM_BUFFERS_IN 
+ "Remote";
+       public static final String IO_NUM_BUFFERS_OUT = "numBuffersOut";
+       public static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = 
IO_NUM_BUFFERS_IN_LOCAL + SUFFIX_RATE;
+       public static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = 
IO_NUM_BUFFERS_IN_REMOTE + SUFFIX_RATE;
+       public static final String IO_NUM_BUFFERS_OUT_RATE = IO_NUM_BUFFERS_OUT 
+ SUFFIX_RATE;
+
        public static final String IO_CURRENT_INPUT_WATERMARK = 
"currentInputWatermark";
        public static final String IO_CURRENT_INPUT_1_WATERMARK = 
"currentInput1Watermark";
        public static final String IO_CURRENT_INPUT_2_WATERMARK = 
"currentInput2Watermark";
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index e12ecd7..79c5e8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -44,12 +44,18 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        private final Counter numBytesInRemote;
        private final SumCounter numRecordsIn;
        private final SumCounter numRecordsOut;
+       private final Counter numBuffersOut;
+       private final Counter numBuffersInLocal;
+       private final Counter numBuffersInRemote;
 
        private final Meter numBytesInRateLocal;
        private final Meter numBytesInRateRemote;
        private final Meter numBytesOutRate;
        private final Meter numRecordsInRate;
        private final Meter numRecordsOutRate;
+       private final Meter numBuffersOutRate;
+       private final Meter numBuffersInRateLocal;
+       private final Meter numBuffersInRateRemote;
 
        public TaskIOMetricGroup(TaskMetricGroup parent) {
                super(parent);
@@ -60,10 +66,18 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, 
new MeterView(numBytesOut, 60));
                this.numBytesInRateLocal = 
meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 
60));
                this.numBytesInRateRemote = 
meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 
60));
+
                this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new 
SumCounter());
                this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, 
new SumCounter());
                this.numRecordsInRate = 
meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
                this.numRecordsOutRate = 
meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
+
+               this.numBuffersOut = counter(MetricNames.IO_NUM_BUFFERS_OUT);
+               this.numBuffersInLocal = 
counter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL);
+               this.numBuffersInRemote = 
counter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE);
+               this.numBuffersOutRate = 
meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut, 60));
+               this.numBuffersInRateLocal = 
meter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL_RATE, new 
MeterView(numBuffersInLocal, 60));
+               this.numBuffersInRateRemote = 
meter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE_RATE, new 
MeterView(numBuffersInRemote, 60));
        }
 
        public IOMetrics createSnapshot() {
@@ -93,6 +107,18 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                return numRecordsOut;
        }
 
+       public Counter getNumBuffersOutCounter() {
+               return numBuffersOut;
+       }
+
+       public Counter getNumBuffersInLocalCounter() {
+               return numBuffersInLocal;
+       }
+
+       public Counter getNumBuffersInRemoteCounter() {
+               return numBuffersInRemote;
+       }
+
        public Meter getNumBytesInLocalRateMeter() {
                return numBytesInRateLocal;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index abadddf..d757aa9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -120,7 +120,7 @@ public class InputChannelTest {
                        int initialBackoff,
                        int maxBackoff) {
 
-                       super(inputGate, channelIndex, partitionId, 
initialBackoff, maxBackoff, new SimpleCounter());
+                       super(inputGate, channelIndex, partitionId, 
initialBackoff, maxBackoff, new SimpleCounter(), new SimpleCounter());
                }
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 80e07f5..ac3f0ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -48,7 +48,7 @@ public class TestInputChannel extends InputChannel {
        private boolean isReleased = false;
 
        TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-               super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, 
new SimpleCounter());
+               super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, 
new SimpleCounter(), new SimpleCounter());
        }
 
        public TestInputChannel read(Buffer buffer) throws IOException, 
InterruptedException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index f23b2f5..b02be74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -54,6 +54,9 @@ public class TaskIOMetricGroupTest {
                taskIO.getNumBytesInLocalCounter().inc(100L);
                taskIO.getNumBytesInRemoteCounter().inc(150L);
                taskIO.getNumBytesOutCounter().inc(250L);
+               taskIO.getNumBuffersInLocalCounter().inc(1L);
+               taskIO.getNumBuffersInRemoteCounter().inc(2L);
+               taskIO.getNumBuffersOutCounter().inc(3L);
 
                IOMetrics io = taskIO.createSnapshot();
                assertEquals(32L, io.getNumRecordsIn());
@@ -61,5 +64,8 @@ public class TaskIOMetricGroupTest {
                assertEquals(100L, io.getNumBytesInLocal());
                assertEquals(150L, io.getNumBytesInRemote());
                assertEquals(250L, io.getNumBytesOut());
+               assertEquals(1L, 
taskIO.getNumBuffersInLocalCounter().getCount());
+               assertEquals(2L, 
taskIO.getNumBuffersInRemoteCounter().getCount());
+               assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
        }
 }

Reply via email to