Repository: flink Updated Branches: refs/heads/master 5b54009eb -> 344fe94db
[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics This closes #2693 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/344fe94d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/344fe94d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/344fe94d Branch: refs/heads/master Commit: 344fe94db96ea44fa7714e105631eb192bb382e4 Parents: 5b54009 Author: zhuhaifengleon <[email protected]> Authored: Wed Oct 26 15:48:31 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Sun Oct 30 15:00:28 2016 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartition.java | 5 + .../io/network/partition/ResultPartition.java | 14 +++ .../network/partition/ResultSubpartition.java | 2 + .../partition/SpillableSubpartition.java | 5 + .../partition/consumer/SingleInputGate.java | 24 ++++ .../metrics/groups/TaskIOMetricGroup.java | 121 +++++++++++++++++++ .../apache/flink/runtime/taskmanager/Task.java | 11 ++ .../runtime/taskmanager/TaskAsyncCallTest.java | 7 +- 8 files changed, 183 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2d7097d..266f581 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition { throw new IllegalStateException("Already registered listener."); } } + + @Override + public int getNumberOfQueuedBuffers() { + return buffers.size(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 5bbfab1..f06cb43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + public int getTotalNumberOfBuffers() { return totalNumberOfBuffers; } @@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner { return totalNumberOfBytes; } + public int getNumberOfQueuedBuffers() { + int totalBuffers = 0; + + for (ResultSubpartition subpartition : subpartitions) { + totalBuffers += subpartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index b7ca9c4..31c8f73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -83,4 +83,6 @@ public abstract class ResultSubpartition { abstract public boolean isReleased(); + abstract public int getNumberOfQueuedBuffers(); + } http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 3e4692a..3f19559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition { getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, spillWriter != null); } + + @Override + public int getNumberOfQueuedBuffers() { + return buffers.size(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index f4e4325..af5fd89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + @Override public int getPageSize() { if (bufferPool != null) { @@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate { } } + public int getNumberOfQueuedBuffers() { + // re-try 3 times, if fails, return 0 for "unknown" + for (int retry = 0; retry < 3; retry++) { + try { + int totalBuffers = 0; + + for (InputChannel channel : inputChannels.values()) { + if (channel instanceof RemoteInputChannel) { + totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers(); + } + } + + return totalBuffers; + } + catch (Exception ignored) {} + } + + return 0; + } + // ------------------------------------------------------------------------ // Setup/Life-cycle // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index ab7ceb2..b2884ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,8 +19,13 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.taskmanager.Task; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Meter numBytesInRateRemote; private final Meter numBytesOutRate; + private final MetricGroup buffers; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); @@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60)); this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60)); this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60)); + + this.buffers = addGroup("buffers"); } public Counter getNumBytesOutCounter() { @@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { public Meter getNumBytesOutRateMeter() { return numBytesOutRate; } + + public MetricGroup getBuffersGroup() { + return buffers; + } + + // ------------------------------------------------------------------------ + // metrics of Buffers group + // ------------------------------------------------------------------------ + + /** + * Input received buffers gauge of a task + */ + public static final class InputBuffersGauge implements Gauge<Integer> { + + private final Task task; + + public InputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for (SingleInputGate inputGate : task.getAllInputGates()) { + totalBuffers += inputGate.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Output produced buffers gauge of a task + */ + public static final class OutputBuffersGauge implements Gauge<Integer> { + + private final Task task; + + public OutputBuffersGauge(Task task) { + this.task = task; + } + + @Override + public Integer getValue() { + int totalBuffers = 0; + + for (ResultPartition producedPartition : task.getProducedPartitions()) { + totalBuffers += producedPartition.getNumberOfQueuedBuffers(); + } + + return totalBuffers; + } + } + + /** + * Input buffer pool usage gauge of a task + */ + public static final class InputBufferPoolUsageGauge implements Gauge<Float> { + + private final Task task; + + public InputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for (SingleInputGate inputGate : task.getAllInputGates()) { + availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += inputGate.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } + + /** + * Output buffer pool usage gauge of a task + */ + public static final class OutputBufferPoolUsageGauge implements Gauge<Float> { + + private final Task task; + + public OutputBufferPoolUsageGauge(Task task) { + this.task = task; + } + + @Override + public Float getValue() { + int availableBuffers = 0; + int bufferPoolSize = 0; + + for (ResultPartition resultPartition : task.getProducedPartitions()) { + availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments(); + bufferPoolSize += resultPartition.getBufferPool().getNumBuffers(); + } + + if (bufferPoolSize != 0) { + return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize; + } else { + return 0.0f; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f09e88a..7ce9b0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateHandles; @@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions { // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + if (this.metrics != null && this.metrics.getIOMetricGroup() != null) { + MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup(); + bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this)); + bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this)); + bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this)); + bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this)); + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 7dd67ed..ed107c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.SerializedValue; + import org.junit.Before; import org.junit.Test; import java.net.URL; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertFalse;
