This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 2a6fb9a [FLINK-12576][Network,Metrics] Take LocalInputChannel into
account when computing inputQueueLength
2a6fb9a is described below
commit 2a6fb9af7bca5a55d1dc9c55b779eea38b43e1a2
Author: Aitozi <[email protected]>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800
[FLINK-12576][Network,Metrics] Take LocalInputChannel into account when
computing inputQueueLength
Currently inputQueueLength ignores
LocalInputChannels(SingleInputGate#getNumberOfQueuedBuffers). This can cause
mistakes when
looking for causes of back pressure(If task is back pressuring whole Flink
job, but there is a data skew and only local input
channels are being used).
---
docs/monitoring/metrics.md | 2 +-
docs/monitoring/metrics.zh.md | 2 +-
.../BoundedBlockingSubpartitionReader.java | 5 +++
.../partition/NoOpResultSubpartitionView.java | 5 +++
.../partition/PipelinedSubpartitionView.java | 5 +++
.../network/partition/ResultSubpartitionView.java | 2 +
.../network/partition/consumer/InputChannel.java | 8 ++++
.../partition/consumer/LocalInputChannel.java | 11 ++++++
.../partition/consumer/RemoteInputChannel.java | 1 +
.../partition/consumer/SingleInputGate.java | 4 +-
.../network/netty/CancelPartitionRequestTest.java | 5 +++
.../partition/consumer/LocalInputChannelTest.java | 1 +
.../partition/consumer/SingleInputGateTest.java | 44 ++++++++++++++++++++++
13 files changed, 90 insertions(+), 5 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
<th rowspan="8">Task</th>
<td rowspan="4">buffers</td>
<td>inputQueueLength</td>
- <td>The number of queued input buffers.</td>
+ <td>The number of queued input buffers. (ignores LocalInputChannels
which are using blocking subpartitions)</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
<th rowspan="8">Task</th>
<td rowspan="4">buffers</td>
<td>inputQueueLength</td>
- <td>The number of queued input buffers.</td>
+ <td>The number of queued input buffers. (ignores LocalInputChannels
which are using blocking subpartitions)</td>
<td>Gauge</td>
</tr>
<tr>
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements
ResultSubpartitionView
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
public String toString() {
return String.format("Blocking Subpartition Reader: ID=%s,
index=%d",
parent.parent.getPartitionId(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements
ResultSubpartitionView {
public boolean isAvailable() {
return false;
}
+
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,6 +91,11 @@ class PipelinedSubpartitionView implements
ResultSubpartitionView {
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
public String toString() {
return String.format("PipelinedSubpartitionView(index: %d) of
ResultPartition %s",
parent.index,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index a755955..49ff4f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -60,4 +60,6 @@ public interface ResultSubpartitionView {
boolean nextBufferIsEvent();
boolean isAvailable();
+
+ int unsynchronizedGetNumberOfQueuedBuffers();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index c0a204b..a0d3cb5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -249,6 +249,14 @@ public abstract class InputChannel {
}
//
------------------------------------------------------------------------
+ // Metric related method
+ //
------------------------------------------------------------------------
+
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ //
------------------------------------------------------------------------
/**
* A combination of a {@link Buffer} and a flag indicating availability
of further buffers,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3a54310..4c7b098 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -261,6 +261,17 @@ public class LocalInputChannel extends InputChannel
implements BufferAvailabilit
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ ResultSubpartitionView view = subpartitionView;
+
+ if (view != null) {
+ return view.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ return 0;
+ }
+
+ @Override
public String toString() {
return "LocalInputChannel [" + partitionId + "]";
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 58b36c3..8ffca16 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -430,6 +430,7 @@ public class RemoteInputChannel extends InputChannel
implements BufferRecycler,
}
}
+ @Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(0, receivedBuffers.size());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fd40c94..13b56df 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -279,9 +279,7 @@ public class SingleInputGate extends InputGate {
int totalBuffers = 0;
for (InputChannel channel :
inputChannels.values()) {
- if (channel instanceof
RemoteInputChannel) {
- totalBuffers +=
((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
- }
+ totalBuffers +=
channel.unsynchronizedGetNumberOfQueuedBuffers();
}
return totalBuffers;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 58c02df..3640864 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -225,6 +225,11 @@ public class CancelPartitionRequestTest {
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ @Override
public Throwable getFailureCause() {
return null;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index fd7cdd1..dc8b501 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -558,5 +558,6 @@ public class LocalInputChannelTest {
return null;
}
+
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 20eae98..057ac5e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,6 +28,7 @@ import
org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -41,6 +42,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -559,6 +561,48 @@ public class SingleInputGateTest extends InputGateTestBase
{
}
}
+ @Test
+ public void testQueuedBuffers() throws Exception {
+ final NettyShuffleEnvironment network =
createNettyShuffleEnvironment();
+
+ final ResultPartition resultPartition = new
ResultPartitionBuilder()
+
.setResultPartitionManager(network.getResultPartitionManager())
+
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+ .build();
+
+ final SingleInputGate inputGate = createInputGate(network, 2,
ResultPartitionType.PIPELINED);
+
+ final ResultPartitionID localResultPartitionId =
resultPartition.getPartitionId();
+
+ final RemoteInputChannel remoteInputChannel =
InputChannelBuilder.newBuilder()
+ .setChannelIndex(1)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildRemoteAndSetToGate(inputGate);
+
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(0)
+ .setPartitionId(localResultPartitionId)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildLocalAndSetToGate(inputGate);
+
+ try {
+ resultPartition.setup();
+ inputGate.setup();
+
+
remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+ assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+
resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledBufferConsumer(1),
0);
+ assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+ } finally {
+ resultPartition.release();
+ inputGate.close();
+ network.close();
+ }
+ }
+
/**
* Tests that if the {@link PartitionNotFoundException} is set onto one
{@link InputChannel},
* then it would be thrown directly via {@link
SingleInputGate#getNextBufferOrEvent()}. So we