This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a63a472e56 [hotfix][runtime/test] Enhance the test case 
CreditBasedPartitionRequestClientHandlerTest#testReceiveCompressedBuffer for 
checking totalQueueSizeInBytes (#28355)
9a63a472e56 is described below

commit 9a63a472e56941b05d48fece3dc0ed7a8b3ab229
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Jun 9 19:14:51 2026 +0800

    [hotfix][runtime/test] Enhance the test case 
CreditBasedPartitionRequestClientHandlerTest#testReceiveCompressedBuffer for 
checking totalQueueSizeInBytes (#28355)
---
 .../partition/consumer/RemoteInputChannel.java     |  7 ++++++-
 ...editBasedPartitionRequestClientHandlerTest.java | 23 ++++++++++++----------
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 66a7d500140..645446120d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -431,7 +431,12 @@ public class RemoteInputChannel extends InputChannel {
     @VisibleForTesting
     public Buffer getNextReceivedBuffer() {
         final SequenceBuffer sequenceBuffer = receivedBuffers.poll();
-        return sequenceBuffer != null ? sequenceBuffer.buffer : null;
+        if (sequenceBuffer != null) {
+            totalQueueSizeInBytes -= sequenceBuffer.buffer.getSize();
+            return sequenceBuffer.buffer;
+        } else {
+            return null;
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index d96ed78b6a0..d4b162304b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -98,15 +98,15 @@ class CreditBasedPartitionRequestClientHandlerTest {
 
     private static Stream<Arguments> bufferDescriptorsWithCompression() {
         return Stream.of(
-                Arguments.of(false, 1, "LZ4"),
-                Arguments.of(false, 1, "LZO"),
-                Arguments.of(false, 1, "ZSTD"),
-                Arguments.of(true, 1, "LZ4"),
-                Arguments.of(true, 1, "LZO"),
-                Arguments.of(true, 1, "ZSTD"),
-                Arguments.of(true, 3, "LZ4"),
-                Arguments.of(true, 3, "LZO"),
-                Arguments.of(true, 3, "ZSTD"));
+                Arguments.of(false, 1, "LZ4", 0L),
+                Arguments.of(false, 1, "LZO", 0L),
+                Arguments.of(false, 1, "ZSTD", 0L),
+                Arguments.of(true, 1, "LZ4", 0L),
+                Arguments.of(true, 1, "LZO", 0L),
+                Arguments.of(true, 1, "ZSTD", 0L),
+                Arguments.of(true, 3, "LZ4", 44L),
+                Arguments.of(true, 3, "LZO", 52L),
+                Arguments.of(true, 3, "ZSTD", 62L));
     }
 
     /**
@@ -237,7 +237,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
     void testReceiveCompressedBuffer(
             final boolean isFullyFilled,
             final int numOfPartialBuffers,
-            final String compressionCodec)
+            final String compressionCodec,
+            final long expectedTotalQueueSizeInBytesOfInputChannel)
             throws Exception {
         int bufferSize = 1024;
         BufferCompressor compressor =
@@ -275,6 +276,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
             handler.channelRead(null, bufferResponse);
 
             Buffer receivedBuffer = inputChannel.getNextReceivedBuffer();
+            assertThat(inputChannel.unsynchronizedGetSizeOfQueuedBuffers())
+                    .isEqualTo(expectedTotalQueueSizeInBytesOfInputChannel);
             assertThat(receivedBuffer).isNotNull();
             assertThat(receivedBuffer.isCompressed()).isTrue();
             receivedBuffer.recycleBuffer();

Reply via email to