This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 2a6fb9a  [FLINK-12576][Network,Metrics] Take LocalInputChannel into 
account when computing inputQueueLength
2a6fb9a is described below

commit 2a6fb9af7bca5a55d1dc9c55b779eea38b43e1a2
Author: Aitozi <[email protected]>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800

    [FLINK-12576][Network,Metrics] Take LocalInputChannel into account when 
computing inputQueueLength
    
    Currently inputQueueLength ignores 
LocalInputChannels(SingleInputGate#getNumberOfQueuedBuffers). This can cause 
mistakes when
    looking for causes of back pressure(If task is back pressuring whole Flink 
job, but there is a data skew and only local input
    channels are being used).
---
 docs/monitoring/metrics.md                         |  2 +-
 docs/monitoring/metrics.zh.md                      |  2 +-
 .../BoundedBlockingSubpartitionReader.java         |  5 +++
 .../partition/NoOpResultSubpartitionView.java      |  5 +++
 .../partition/PipelinedSubpartitionView.java       |  5 +++
 .../network/partition/ResultSubpartitionView.java  |  2 +
 .../network/partition/consumer/InputChannel.java   |  8 ++++
 .../partition/consumer/LocalInputChannel.java      | 11 ++++++
 .../partition/consumer/RemoteInputChannel.java     |  1 +
 .../partition/consumer/SingleInputGate.java        |  4 +-
 .../network/netty/CancelPartitionRequestTest.java  |  5 +++
 .../partition/consumer/LocalInputChannelTest.java  |  1 +
 .../partition/consumer/SingleInputGateTest.java    | 44 ++++++++++++++++++++++
 13 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
       <th rowspan="8">Task</th>
       <td rowspan="4">buffers</td>
       <td>inputQueueLength</td>
-      <td>The number of queued input buffers.</td>
+      <td>The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
       <th rowspan="8">Task</th>
       <td rowspan="4">buffers</td>
       <td>inputQueueLength</td>
-      <td>The number of queued input buffers.</td>
+      <td>The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
        }
 
        @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
        public String toString() {
                return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
                                parent.parent.getPartitionId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements 
ResultSubpartitionView {
        public boolean isAvailable() {
                return false;
        }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return 0;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,6 +91,11 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
        }
 
        @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
        public String toString() {
                return String.format("PipelinedSubpartitionView(index: %d) of 
ResultPartition %s",
                                parent.index,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index a755955..49ff4f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -60,4 +60,6 @@ public interface ResultSubpartitionView {
        boolean nextBufferIsEvent();
 
        boolean isAvailable();
+
+       int unsynchronizedGetNumberOfQueuedBuffers();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index c0a204b..a0d3cb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -249,6 +249,14 @@ public abstract class InputChannel {
        }
 
        // 
------------------------------------------------------------------------
+       // Metric related method
+       // 
------------------------------------------------------------------------
+
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return 0;
+       }
+
+       // 
------------------------------------------------------------------------
 
        /**
         * A combination of a {@link Buffer} and a flag indicating availability 
of further buffers,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3a54310..4c7b098 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -261,6 +261,17 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
        }
 
        @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               ResultSubpartitionView view = subpartitionView;
+
+               if (view != null) {
+                       return view.unsynchronizedGetNumberOfQueuedBuffers();
+               }
+
+               return 0;
+       }
+
+       @Override
        public String toString() {
                return "LocalInputChannel [" + partitionId + "]";
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 58b36c3..8ffca16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -430,6 +430,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                }
        }
 
+       @Override
        public int unsynchronizedGetNumberOfQueuedBuffers() {
                return Math.max(0, receivedBuffers.size());
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fd40c94..13b56df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -279,9 +279,7 @@ public class SingleInputGate extends InputGate {
                                int totalBuffers = 0;
 
                                for (InputChannel channel : 
inputChannels.values()) {
-                                       if (channel instanceof 
RemoteInputChannel) {
-                                               totalBuffers += 
((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
-                                       }
+                                       totalBuffers += 
channel.unsynchronizedGetNumberOfQueuedBuffers();
                                }
 
                                return  totalBuffers;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 58c02df..3640864 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -225,6 +225,11 @@ public class CancelPartitionRequestTest {
                }
 
                @Override
+               public int unsynchronizedGetNumberOfQueuedBuffers() {
+                       return 0;
+               }
+
+               @Override
                public Throwable getFailureCause() {
                        return null;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index fd7cdd1..dc8b501 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -558,5 +558,6 @@ public class LocalInputChannelTest {
 
                        return null;
                }
+
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 20eae98..057ac5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -41,6 +42,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -559,6 +561,48 @@ public class SingleInputGateTest extends InputGateTestBase 
{
                }
        }
 
+       @Test
+       public void testQueuedBuffers() throws Exception {
+               final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+               final ResultPartition resultPartition = new 
ResultPartitionBuilder()
+                       
.setResultPartitionManager(network.getResultPartitionManager())
+                       
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+                       .build();
+
+               final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+               final ResultPartitionID localResultPartitionId = 
resultPartition.getPartitionId();
+
+               final RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
+                       .setChannelIndex(1)
+                       .setupFromNettyShuffleEnvironment(network)
+                       .setConnectionManager(new TestingConnectionManager())
+                       .buildRemoteAndSetToGate(inputGate);
+
+               InputChannelBuilder.newBuilder()
+                       .setChannelIndex(0)
+                       .setPartitionId(localResultPartitionId)
+                       .setupFromNettyShuffleEnvironment(network)
+                       .setConnectionManager(new TestingConnectionManager())
+                       .buildLocalAndSetToGate(inputGate);
+
+               try {
+                       resultPartition.setup();
+                       inputGate.setup();
+
+                       
remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+                       assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+                       
resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledBufferConsumer(1),
 0);
+                       assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+               } finally {
+                       resultPartition.release();
+                       inputGate.close();
+                       network.close();
+               }
+       }
+
        /**
         * Tests that if the {@link PartitionNotFoundException} is set onto one 
{@link InputChannel},
         * then it would be thrown directly via {@link 
SingleInputGate#getNextBufferOrEvent()}. So we

Reply via email to