This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 088d16517126d7128e1f072d588b9ce0f4787ae6 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Jul 28 18:12:28 2021 +0200 [FLINK-23453][refactor] Made a safe method for getting the number of buffers in the queue visible in the interface. --- .../runtime/io/network/partition/BoundedBlockingSubpartition.java | 5 +++++ .../partition/BoundedBlockingSubpartitionDirectTransferReader.java | 5 +++++ .../io/network/partition/BoundedBlockingSubpartitionReader.java | 5 +++++ .../runtime/io/network/partition/NoOpResultSubpartitionView.java | 5 +++++ .../flink/runtime/io/network/partition/PipelinedSubpartition.java | 7 +++++-- .../runtime/io/network/partition/PipelinedSubpartitionView.java | 5 +++++ .../flink/runtime/io/network/partition/ResultSubpartition.java | 3 +++ .../flink/runtime/io/network/partition/ResultSubpartitionView.java | 2 ++ .../runtime/io/network/partition/SortMergeSubpartitionReader.java | 7 +++++++ .../flink/runtime/io/network/netty/CancelPartitionRequestTest.java | 5 +++++ .../flink/runtime/io/network/partition/InputGateFairnessTest.java | 4 ++-- .../flink/runtime/io/network/partition/ResultPartitionTest.java | 4 ++-- 12 files changed, 51 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 2c5fd18..275b563 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -267,6 +267,11 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { } @Override + public int getNumberOfQueuedBuffers() { + return 0; + } + + @Override protected long getTotalNumberOfBuffers() { return numBuffersAndEventsWritten; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java index d903428..db716f2 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java @@ -149,6 +149,11 @@ public class BoundedBlockingSubpartitionDirectTransferReader implements ResultSu } @Override + public int getNumberOfQueuedBuffers() { + return parent.getNumberOfQueuedBuffers(); + } + + @Override public void notifyDataAvailable() { throw new UnsupportedOperationException("Method should never be called."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java index 2fbf5ad..88a294f 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -182,6 +182,11 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView } @Override + public int getNumberOfQueuedBuffers() { + return parent.getNumberOfQueuedBuffers(); + } + + @Override public String toString() { return String.format( "Blocking Subpartition Reader: ID=%s, index=%d", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java index 35d6921..5bf8f07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java @@ -59,4 +59,9 @@ public class NoOpResultSubpartitionView implements ResultSubpartitionView { public int unsynchronizedGetNumberOfQueuedBuffers() { return 0; } + + @Override + public int getNumberOfQueuedBuffers() { + return 0; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 9cb88fc..692a2fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -443,8 +443,11 @@ public class PipelinedSubpartition extends ResultSubpartition // ------------------------------------------------------------------------ - int getCurrentNumberOfBuffers() { - return buffers.size(); + @Override + public int getNumberOfQueuedBuffers() { + synchronized (buffers) { + return buffers.size(); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index 577835f..3335f25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -100,6 +100,11 @@ public class PipelinedSubpartitionView implements ResultSubpartitionView { } @Override + public int getNumberOfQueuedBuffers() { + return parent.getNumberOfQueuedBuffers(); + } + + @Override public String toString() { return String.format( "%s(index: %d) of ResultPartition %s", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index b06dbef..37ef834 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -106,6 +106,9 @@ public abstract class ResultSubpartition { */ public abstract int unsynchronizedGetNumberOfQueuedBuffers(); + /** Get the current size of the queue. */ + public abstract int getNumberOfQueuedBuffers(); + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index e3e305c..eddf661 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -61,6 +61,8 @@ public interface ResultSubpartitionView { int unsynchronizedGetNumberOfQueuedBuffers(); + int getNumberOfQueuedBuffers(); + /** * Availability of the {@link ResultSubpartitionView} and the backlog in the corresponding * {@link ResultSubpartition}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java index db6d268..80e62d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java @@ -234,4 +234,11 @@ class SortMergeSubpartitionReader public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(0, buffersRead.size()); } + + @Override + public int getNumberOfQueuedBuffers() { + synchronized (lock) { + return buffersRead.size(); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 7780b96..126fdbf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -242,6 +242,11 @@ public class CancelPartitionRequestTest { } @Override + public int getNumberOfQueuedBuffers() { + return 0; + } + + @Override public Throwable getFailureCause() { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index b634ebb..eae2a78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -116,7 +116,7 @@ public class InputGateFairnessTest { int max = 0; for (PipelinedSubpartition source : sources) { - int size = source.getCurrentNumberOfBuffers(); + int size = source.getNumberOfQueuedBuffers(); min = Math.min(min, size); max = Math.max(max, size); } @@ -181,7 +181,7 @@ public class InputGateFairnessTest { int max = 0; for (PipelinedSubpartition source : sources) { - int size = source.getCurrentNumberOfBuffers(); + int size = source.getNumberOfQueuedBuffers(); min = Math.min(min, size); max = Math.max(max, size); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 3441f23..1bdd9fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -600,7 +600,7 @@ public class ResultPartitionTest { // emit the second record, record length = bufferSize bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); } finally { - assertEquals(2, pipelinedSubpartition.getCurrentNumberOfBuffers()); + assertEquals(2, pipelinedSubpartition.getNumberOfQueuedBuffers()); assertEquals(0, pipelinedSubpartition.getNextBuffer().getPartialRecordLength()); assertEquals( partialLength, pipelinedSubpartition.getNextBuffer().getPartialRecordLength()); @@ -623,7 +623,7 @@ public class ResultPartitionTest { bufferWritingResultPartition.subpartitions) { PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition) resultSubpartition; - assertEquals(2, pipelinedSubpartition.getCurrentNumberOfBuffers()); + assertEquals(2, pipelinedSubpartition.getNumberOfQueuedBuffers()); assertEquals(0, pipelinedSubpartition.getNextBuffer().getPartialRecordLength()); assertEquals( partialLength,
