This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 088d16517126d7128e1f072d588b9ce0f4787ae6
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jul 28 18:12:28 2021 +0200

    [FLINK-23453][refactor] Made a safe method for getting the number of 
buffers in the queue visible in the interface.
---
 .../runtime/io/network/partition/BoundedBlockingSubpartition.java  | 5 +++++
 .../partition/BoundedBlockingSubpartitionDirectTransferReader.java | 5 +++++
 .../io/network/partition/BoundedBlockingSubpartitionReader.java    | 5 +++++
 .../runtime/io/network/partition/NoOpResultSubpartitionView.java   | 5 +++++
 .../flink/runtime/io/network/partition/PipelinedSubpartition.java  | 7 +++++--
 .../runtime/io/network/partition/PipelinedSubpartitionView.java    | 5 +++++
 .../flink/runtime/io/network/partition/ResultSubpartition.java     | 3 +++
 .../flink/runtime/io/network/partition/ResultSubpartitionView.java | 2 ++
 .../runtime/io/network/partition/SortMergeSubpartitionReader.java  | 7 +++++++
 .../flink/runtime/io/network/netty/CancelPartitionRequestTest.java | 5 +++++
 .../flink/runtime/io/network/partition/InputGateFairnessTest.java  | 4 ++--
 .../flink/runtime/io/network/partition/ResultPartitionTest.java    | 4 ++--
 12 files changed, 51 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 2c5fd18..275b563 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -267,6 +267,11 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
     }
 
     @Override
+    public int getNumberOfQueuedBuffers() {
+        return 0;
+    }
+
+    @Override
     protected long getTotalNumberOfBuffers() {
         return numBuffersAndEventsWritten;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
index d903428..db716f2 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
@@ -149,6 +149,11 @@ public class 
BoundedBlockingSubpartitionDirectTransferReader implements ResultSu
     }
 
     @Override
+    public int getNumberOfQueuedBuffers() {
+        return parent.getNumberOfQueuedBuffers();
+    }
+
+    @Override
     public void notifyDataAvailable() {
         throw new UnsupportedOperationException("Method should never be 
called.");
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 2fbf5ad..88a294f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -182,6 +182,11 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
     }
 
     @Override
+    public int getNumberOfQueuedBuffers() {
+        return parent.getNumberOfQueuedBuffers();
+    }
+
+    @Override
     public String toString() {
         return String.format(
                 "Blocking Subpartition Reader: ID=%s, index=%d",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index 35d6921..5bf8f07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -59,4 +59,9 @@ public class NoOpResultSubpartitionView implements 
ResultSubpartitionView {
     public int unsynchronizedGetNumberOfQueuedBuffers() {
         return 0;
     }
+
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        return 0;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 9cb88fc..692a2fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -443,8 +443,11 @@ public class PipelinedSubpartition extends 
ResultSubpartition
 
     // ------------------------------------------------------------------------
 
-    int getCurrentNumberOfBuffers() {
-        return buffers.size();
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 577835f..3335f25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -100,6 +100,11 @@ public class PipelinedSubpartitionView implements 
ResultSubpartitionView {
     }
 
     @Override
+    public int getNumberOfQueuedBuffers() {
+        return parent.getNumberOfQueuedBuffers();
+    }
+
+    @Override
     public String toString() {
         return String.format(
                 "%s(index: %d) of ResultPartition %s",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index b06dbef..37ef834 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -106,6 +106,9 @@ public abstract class ResultSubpartition {
      */
     public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+    /** Get the current size of the queue. */
+    public abstract int getNumberOfQueuedBuffers();
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index e3e305c..eddf661 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -61,6 +61,8 @@ public interface ResultSubpartitionView {
 
     int unsynchronizedGetNumberOfQueuedBuffers();
 
+    int getNumberOfQueuedBuffers();
+
     /**
      * Availability of the {@link ResultSubpartitionView} and the backlog in 
the corresponding
      * {@link ResultSubpartition}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index db6d268..80e62d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -234,4 +234,11 @@ class SortMergeSubpartitionReader
     public int unsynchronizedGetNumberOfQueuedBuffers() {
         return Math.max(0, buffersRead.size());
     }
+
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        synchronized (lock) {
+            return buffersRead.size();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 7780b96..126fdbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -242,6 +242,11 @@ public class CancelPartitionRequestTest {
         }
 
         @Override
+        public int getNumberOfQueuedBuffers() {
+            return 0;
+        }
+
+        @Override
         public Throwable getFailureCause() {
             return null;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index b634ebb..eae2a78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -116,7 +116,7 @@ public class InputGateFairnessTest {
             int max = 0;
 
             for (PipelinedSubpartition source : sources) {
-                int size = source.getCurrentNumberOfBuffers();
+                int size = source.getNumberOfQueuedBuffers();
                 min = Math.min(min, size);
                 max = Math.max(max, size);
             }
@@ -181,7 +181,7 @@ public class InputGateFairnessTest {
                 int max = 0;
 
                 for (PipelinedSubpartition source : sources) {
-                    int size = source.getCurrentNumberOfBuffers();
+                    int size = source.getNumberOfQueuedBuffers();
                     min = Math.min(min, size);
                     max = Math.max(max, size);
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 3441f23..1bdd9fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -600,7 +600,7 @@ public class ResultPartitionTest {
             // emit the second record, record length = bufferSize
             
bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
         } finally {
-            assertEquals(2, pipelinedSubpartition.getCurrentNumberOfBuffers());
+            assertEquals(2, pipelinedSubpartition.getNumberOfQueuedBuffers());
             assertEquals(0, 
pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
             assertEquals(
                     partialLength, 
pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
@@ -623,7 +623,7 @@ public class ResultPartitionTest {
                     bufferWritingResultPartition.subpartitions) {
                 PipelinedSubpartition pipelinedSubpartition =
                         (PipelinedSubpartition) resultSubpartition;
-                assertEquals(2, 
pipelinedSubpartition.getCurrentNumberOfBuffers());
+                assertEquals(2, 
pipelinedSubpartition.getNumberOfQueuedBuffers());
                 assertEquals(0, 
pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
                 assertEquals(
                         partialLength,

Reply via email to