This is an automated email from the ASF dual-hosted git repository.
RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9a63a472e56 [hotfix][runtime/test] Enhance the test case
CreditBasedPartitionRequestClientHandlerTest#testReceiveCompressedBuffer for
checking totalQueueSizeInBytes (#28355)
9a63a472e56 is described below
commit 9a63a472e56941b05d48fece3dc0ed7a8b3ab229
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Jun 9 19:14:51 2026 +0800
[hotfix][runtime/test] Enhance the test case
CreditBasedPartitionRequestClientHandlerTest#testReceiveCompressedBuffer for
checking totalQueueSizeInBytes (#28355)
---
.../partition/consumer/RemoteInputChannel.java | 7 ++++++-
...editBasedPartitionRequestClientHandlerTest.java | 23 ++++++++++++----------
2 files changed, 19 insertions(+), 11 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 66a7d500140..645446120d0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -431,7 +431,12 @@ public class RemoteInputChannel extends InputChannel {
@VisibleForTesting
public Buffer getNextReceivedBuffer() {
final SequenceBuffer sequenceBuffer = receivedBuffers.poll();
- return sequenceBuffer != null ? sequenceBuffer.buffer : null;
+ if (sequenceBuffer != null) {
+ totalQueueSizeInBytes -= sequenceBuffer.buffer.getSize();
+ return sequenceBuffer.buffer;
+ } else {
+ return null;
+ }
}
@VisibleForTesting
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index d96ed78b6a0..d4b162304b8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -98,15 +98,15 @@ class CreditBasedPartitionRequestClientHandlerTest {
private static Stream<Arguments> bufferDescriptorsWithCompression() {
return Stream.of(
- Arguments.of(false, 1, "LZ4"),
- Arguments.of(false, 1, "LZO"),
- Arguments.of(false, 1, "ZSTD"),
- Arguments.of(true, 1, "LZ4"),
- Arguments.of(true, 1, "LZO"),
- Arguments.of(true, 1, "ZSTD"),
- Arguments.of(true, 3, "LZ4"),
- Arguments.of(true, 3, "LZO"),
- Arguments.of(true, 3, "ZSTD"));
+ Arguments.of(false, 1, "LZ4", 0L),
+ Arguments.of(false, 1, "LZO", 0L),
+ Arguments.of(false, 1, "ZSTD", 0L),
+ Arguments.of(true, 1, "LZ4", 0L),
+ Arguments.of(true, 1, "LZO", 0L),
+ Arguments.of(true, 1, "ZSTD", 0L),
+ Arguments.of(true, 3, "LZ4", 44L),
+ Arguments.of(true, 3, "LZO", 52L),
+ Arguments.of(true, 3, "ZSTD", 62L));
}
/**
@@ -237,7 +237,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
void testReceiveCompressedBuffer(
final boolean isFullyFilled,
final int numOfPartialBuffers,
- final String compressionCodec)
+ final String compressionCodec,
+ final long expectedTotalQueueSizeInBytesOfInputChannel)
throws Exception {
int bufferSize = 1024;
BufferCompressor compressor =
@@ -275,6 +276,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
handler.channelRead(null, bufferResponse);
Buffer receivedBuffer = inputChannel.getNextReceivedBuffer();
+ assertThat(inputChannel.unsynchronizedGetSizeOfQueuedBuffers())
+ .isEqualTo(expectedTotalQueueSizeInBytesOfInputChannel);
assertThat(receivedBuffer).isNotNull();
assertThat(receivedBuffer.isCompressed()).isTrue();
receivedBuffer.recycleBuffer();