This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b5166bc633f211ffc78355508edf090c74322a7c Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Thu May 8 13:32:20 2025 +0300 [fix][misc] Fix ByteBuf leaks in tests by making ByteBufPair.coalesce release the input ByteBufPair (#24273) (cherry picked from commit 7eeeaea3b237b9e79b22940071e313a9990ec872) --- .../org/apache/pulsar/common/protocol/ByteBufPair.java | 10 +++++++++- .../org/apache/pulsar/common/compression/CommandsTest.java | 6 +++--- .../org/apache/pulsar/common/protocol/ByteBufPairTest.java | 14 ++++++++++++-- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index 6c4f42fcf88..5dbf07f8e83 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -82,13 +82,21 @@ public final class ByteBufPair extends AbstractReferenceCounted { } /** - * @return a single buffer with the content of both individual buffers + * Combines the content of both buffers into a single {@link ByteBuf}. + * + * <p>This method creates a new {@link ByteBuf} with the combined readable content + * of the two buffers in the given {@link ByteBufPair}. The original buffer is + * released after the data is written into the new buffer. + * + * @param pair the {@link ByteBufPair} containing the two buffers to be coalesced + * @return a new {@link ByteBuf} containing the combined content of both buffers */ @VisibleForTesting public static ByteBuf coalesce(ByteBufPair pair) { ByteBuf b = Unpooled.buffer(pair.readableBytes()); b.writeBytes(pair.b1, pair.b1.readerIndex(), pair.b1.readableBytes()); b.writeBytes(pair.b2, pair.b2.readerIndex(), pair.b2.readableBytes()); + pair.release(); return b; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index a1f79b7ae7f..bbbdcd74a4d 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -88,9 +88,9 @@ public class CommandsTest { metaPayloadFrame.writeInt(metadataSize); msgMetadata.writeTo(metaPayloadFrame); ByteBuf payload = compressedPayload.copy(); - ByteBufPair metaPayloadBuf = ByteBufPair.get(metaPayloadFrame, payload); - int computedChecksum = Crc32cIntChecksum.computeChecksum(ByteBufPair.coalesce(metaPayloadBuf)); - metaPayloadBuf.release(); + ByteBuf byteBuf = ByteBufPair.coalesce(ByteBufPair.get(metaPayloadFrame, payload)); + int computedChecksum = Crc32cIntChecksum.computeChecksum(byteBuf); + byteBuf.release(); return computedChecksum; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java index 4af7560e6f6..4939ce9255e 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java @@ -22,11 +22,10 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; - import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; - import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.testng.annotations.Test; @@ -83,4 +82,15 @@ public class ByteBufPairTest { assertEquals(b2.refCnt(), 0); } + @Test + public void testCoalesce() { + ByteBuf b1 = Unpooled.wrappedBuffer("hello".getBytes()); + ByteBuf b2 = Unpooled.wrappedBuffer("world".getBytes()); + ByteBufPair buf = ByteBufPair.get(b1, b2); + ByteBuf coalesced = ByteBufPair.coalesce(buf); + assertEquals(b1.refCnt(), 0); + assertEquals(b2.refCnt(), 0); + assertEquals(new String(ByteBufUtil.getBytes(coalesced)), "helloworld"); + coalesced.release(); + } }