Repository: flink
Updated Branches:
  refs/heads/master 5b54009eb -> 344fe94db


[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and 
bufferPool usage as a metrics

This closes #2693


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/344fe94d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/344fe94d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/344fe94d

Branch: refs/heads/master
Commit: 344fe94db96ea44fa7714e105631eb192bb382e4
Parents: 5b54009
Author: zhuhaifengleon <[email protected]>
Authored: Wed Oct 26 15:48:31 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Sun Oct 30 15:00:28 2016 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        |   5 +
 .../io/network/partition/ResultPartition.java   |  14 +++
 .../network/partition/ResultSubpartition.java   |   2 +
 .../partition/SpillableSubpartition.java        |   5 +
 .../partition/consumer/SingleInputGate.java     |  24 ++++
 .../metrics/groups/TaskIOMetricGroup.java       | 121 +++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  11 ++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   7 +-
 8 files changed, 183 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..266f581 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition {
                        throw new IllegalStateException("Already registered 
listener.");
                }
        }
+
+       @Override
+       public int getNumberOfQueuedBuffers() {
+                       return buffers.size();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 5bbfab1..f06cb43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner {
                return bufferPool;
        }
 
+       public BufferPool getBufferPool() {
+               return bufferPool;
+       }
+
        public int getTotalNumberOfBuffers() {
                return totalNumberOfBuffers;
        }
@@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner {
                return totalNumberOfBytes;
        }
 
+       public int getNumberOfQueuedBuffers() {
+               int totalBuffers = 0;
+
+               for (ResultSubpartition subpartition : subpartitions) {
+                       totalBuffers += subpartition.getNumberOfQueuedBuffers();
+               }
+
+               return totalBuffers;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index b7ca9c4..31c8f73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -83,4 +83,6 @@ public abstract class ResultSubpartition {
 
        abstract public boolean isReleased();
 
+       abstract public int getNumberOfQueuedBuffers();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3e4692a..3f19559 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition {
                                getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
                                spillWriter != null);
        }
+
+       @Override
+       public int getNumberOfQueuedBuffers() {
+                       return buffers.size();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f4e4325..af5fd89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate {
                return bufferPool;
        }
 
+       public BufferPool getBufferPool() {
+               return bufferPool;
+       }
+
        @Override
        public int getPageSize() {
                if (bufferPool != null) {
@@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate {
                }
        }
 
+       public int getNumberOfQueuedBuffers() {
+               // re-try 3 times, if fails, return 0 for "unknown"
+               for (int retry = 0; retry < 3; retry++) {
+                       try {
+                               int totalBuffers = 0;
+
+                               for (InputChannel channel : 
inputChannels.values()) {
+                                       if (channel instanceof 
RemoteInputChannel) {
+                                               totalBuffers += 
((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
+                                       }
+                               }
+
+                               return  totalBuffers;
+                       }
+                       catch (Exception ignored) {}
+               }
+
+               return 0;
+       }
+
        // 
------------------------------------------------------------------------
        // Setup/Life-cycle
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index ab7ceb2..b2884ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,8 +19,13 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.taskmanager.Task;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        private final Meter numBytesInRateRemote;
        private final Meter numBytesOutRate;
 
+       private final MetricGroup buffers;
+
        public TaskIOMetricGroup(TaskMetricGroup parent) {
                super(parent);
 
@@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                this.numBytesOutRate = meter("numBytesOutPerSecond", new 
MeterView(numBytesOut, 60));
                this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", 
new MeterView(numBytesInLocal, 60));
                this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", 
new MeterView(numBytesInRemote, 60));
+
+               this.buffers = addGroup("buffers");
        }
 
        public Counter getNumBytesOutCounter() {
@@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        public Meter getNumBytesOutRateMeter() {
                return numBytesOutRate;
        }
+
+       public MetricGroup getBuffersGroup() {
+               return buffers;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  metrics of Buffers group
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Input received buffers gauge of a task
+        */
+       public static final class InputBuffersGauge implements Gauge<Integer> {
+
+               private final Task task;
+
+               public InputBuffersGauge(Task task) {
+                       this.task = task;
+               }
+
+               @Override
+               public Integer getValue() {
+                       int totalBuffers = 0;
+
+                       for (SingleInputGate inputGate : 
task.getAllInputGates()) {
+                               totalBuffers += 
inputGate.getNumberOfQueuedBuffers();
+                       }
+
+                       return totalBuffers;
+               }
+       }
+
+       /**
+        * Output produced buffers gauge of a task
+        */
+       public static final class OutputBuffersGauge implements Gauge<Integer> {
+
+               private final Task task;
+
+               public OutputBuffersGauge(Task task) {
+                       this.task = task;
+               }
+
+               @Override
+               public Integer getValue() {
+                       int totalBuffers = 0;
+
+                       for (ResultPartition producedPartition : 
task.getProducedPartitions()) {
+                               totalBuffers += 
producedPartition.getNumberOfQueuedBuffers();
+                       }
+
+                       return totalBuffers;
+               }
+       }
+
+       /**
+        * Input buffer pool usage gauge of a task
+        */
+       public static final class InputBufferPoolUsageGauge implements 
Gauge<Float> {
+
+               private final Task task;
+
+               public InputBufferPoolUsageGauge(Task task) {
+                       this.task = task;
+               }
+
+               @Override
+               public Float getValue() {
+                       int availableBuffers = 0;
+                       int bufferPoolSize = 0;
+
+                       for (SingleInputGate inputGate : 
task.getAllInputGates()) {
+                               availableBuffers += 
inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
+                               bufferPoolSize += 
inputGate.getBufferPool().getNumBuffers();
+                       }
+
+                       if (bufferPoolSize != 0) {
+                               return ((float)(bufferPoolSize - 
availableBuffers)) / bufferPoolSize;
+                       } else {
+                               return 0.0f;
+                       }
+               }
+       }
+
+       /**
+        * Output buffer pool usage gauge of a task
+        */
+       public static final class OutputBufferPoolUsageGauge implements 
Gauge<Float> {
+
+               private final Task task;
+
+               public OutputBufferPoolUsageGauge(Task task) {
+                       this.task = task;
+               }
+
+               @Override
+               public Float getValue() {
+                       int availableBuffers = 0;
+                       int bufferPoolSize = 0;
+
+                       for (ResultPartition resultPartition : 
task.getProducedPartitions()) {
+                               availableBuffers += 
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
+                               bufferPoolSize += 
resultPartition.getBufferPool().getNumBuffers();
+                       }
+
+                       if (bufferPoolSize != 0) {
+                               return ((float)(bufferPoolSize - 
availableBuffers)) / bufferPoolSize;
+                       } else {
+                               return 0.0f;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f09e88a..7ce9b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -57,6 +58,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions {
 
                // finally, create the executing thread, but do not start it
                executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+               // add metrics for buffers
+               if (this.metrics != null && this.metrics.getIOMetricGroup() != 
null) {
+                       MetricGroup bufferMetrics = 
this.metrics.getIOMetricGroup().getBuffersGroup();
+                       bufferMetrics.gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
+                       bufferMetrics.gauge("outputQueueLength", new 
TaskIOMetricGroup.OutputBuffersGauge(this));
+                       bufferMetrics.gauge("inPoolUsage", new 
TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
+                       bufferMetrics.gauge("outPoolUsage", new 
TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7dd67ed..ed107c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Before;
 import org.junit.Test;
 
 import java.net.URL;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertFalse;

Reply via email to