This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 669c6426f8e [fix][test] Fix multiple ByteBuf leaks in tests (#24281)
669c6426f8e is described below

commit 669c6426f8e0b7aa3e839c59e59e4686651b4476
Author: Lari Hotari <[email protected]>
AuthorDate: Thu May 8 22:32:30 2025 +0300

    [fix][test] Fix multiple ByteBuf leaks in tests (#24281)
---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  7 +++
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++
 .../ManagedLedgerInterceptorImplTest.java          |  8 ++-
 .../broker/service/AbstractBaseDispatcherTest.java | 17 +++++--
 .../service/PersistentMessageFinderTest.java       |  6 ++-
 .../pulsar/broker/service/PersistentTopicTest.java | 17 ++++++-
 .../pulsar/broker/service/ServerCnxTest.java       | 59 +++++++++++++++-------
 .../broker/service/SharedConsumerAssignorTest.java | 25 +++++++--
 .../service/persistent/MessageDuplicationTest.java | 27 +++++++++-
 ...ReplicatedSubscriptionsSnapshotBuilderTest.java | 39 +++++++++++---
 .../pulsar/common/compression/CommandsTest.java    |  3 +-
 .../common/compression/CompressorCodecTest.java    |  2 +
 .../pulsar/common/protocol/CommandUtilsTests.java  | 21 ++------
 .../apache/pulsar/common/protocol/MarkersTest.java |  6 +++
 14 files changed, 184 insertions(+), 58 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bcaed2d7322..ce8b0334226 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2899,6 +2899,9 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(), 
markDeletedPosition.getEntryId() + 7,
                 ByteBufAllocator.DEFAULT.buffer(0));
         List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, 
entry4, entry5);
+        // release data buffers since EntryImpl.create will retain the buffer
+        entries.forEach(entry -> entry.getDataBuffer().release());
+
         c1.trimDeletedEntries(entries);
         assertEquals(entries.size(), 1);
         assertEquals(entries.get(0).getPosition(), 
PositionFactory.create(markDeletedPosition.getLedgerId(),
@@ -2908,6 +2911,9 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(entry2.refCnt(), 0);
         assertEquals(entry3.refCnt(), 0);
         assertEquals(entry4.refCnt(), 0);
+
+        // release remaining entry
+        entries.forEach(Entry::release);
     }
 
     @Test(timeOut = 20000)
@@ -5286,6 +5292,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     @Test
     void testForceCursorRecovery() throws Exception {
         TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
+        factory.shutdown();
         factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setLedgerForceRecovery(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4cc8ae178c3..f9605449b1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3778,4 +3778,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     public void decrementThrottleCount() {
         throttleTracker.decrementThrottleCount();
     }
+
+    @VisibleForTesting
+    void setAuthState(AuthenticationState authState) {
+        this.authState = authState;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
index c9eb8b6ef7c..8d50d91a1ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
@@ -124,10 +124,12 @@ public class ManagedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase
         assertEquals(19, ((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex());
         List<Entry> entryList = cursor.readEntries(numberOfEntries);
         for (int i = 0 ; i < numberOfEntries; i ++) {
+            Entry entry = entryList.get(i);
             BrokerEntryMetadata metadata =
-                    
Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer());
+                    
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
             assertNotNull(metadata);
             assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1);
+            entry.release();
         }
 
         cursor.close();
@@ -151,7 +153,9 @@ public class ManagedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase
         ledger.addEntry("Test Message".getBytes());
         factory.getEntryCacheManager().clear();
         List<Entry> entryList = cursor.readEntries(1);
-        String message = new String(entryList.get(0).getData());
+        Entry entry = entryList.get(0);
+        String message = new String(entry.getData());
+        entry.release();
         Assert.assertTrue(message.equals("Test Message"));
         cursor.close();
         ledger.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 23f7278b254..0d27b8714ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -108,7 +108,9 @@ public class AbstractBaseDispatcherTest {
 
         List<Entry> entries = new ArrayList<>();
 
-        Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+        ByteBuf message = createMessage("message1", 1);
+        Entry e = EntryImpl.create(1, 2, message);
+        message.release();
         long expectedBytePermits = e.getLength();
         entries.add(e);
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
@@ -124,7 +126,9 @@ public class AbstractBaseDispatcherTest {
     @Test
     public void testFilterEntriesForConsumerOfTxnMsgAbort() {
         List<Entry> entries = new ArrayList<>();
-        entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1", 
1)));
+        ByteBuf message = createTnxAbortMessage("message1", 1);
+        entries.add(EntryImpl.create(1, 1, message));
+        message.release();
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -140,7 +144,9 @@ public class AbstractBaseDispatcherTest {
         when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true);
 
         List<Entry> entries = new ArrayList<>();
-        entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
+        ByteBuf message = createTnxMessage("message1", 1);
+        entries.add(EntryImpl.create(1, 1, message));
+        message.release();
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -154,6 +160,7 @@ public class AbstractBaseDispatcherTest {
         ByteBuf markerMessage =
                 
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", 
"testSourceCluster");
         entries.add(EntryImpl.create(1, 1, markerMessage));
+        markerMessage.release();
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -164,7 +171,9 @@ public class AbstractBaseDispatcherTest {
     @Test
     public void testFilterEntriesForConsumerOfDelayedMsg() {
         List<Entry> entries = new ArrayList<>();
-        entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1", 
1)));
+        ByteBuf message = createDelayedMessage("message1", 1);
+        entries.add(EntryImpl.create(1, 1, message));
+        message.release();
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index bab21fbc001..f69eb1be432 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -98,7 +98,8 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         headers.writeInt(msgMetadataSize);
         messageMetadata.writeTo(headers);
         ByteBuf headersAndPayload = 
ByteBufPair.coalesce(ByteBufPair.get(headers, data));
-        byte[] byteMessage = headersAndPayload.nioBuffer().array();
+        byte[] byteMessage = new byte[headersAndPayload.readableBytes()];
+        headersAndPayload.readBytes(byteMessage);
         headersAndPayload.release();
         return byteMessage;
     }
@@ -123,7 +124,8 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) 
throws Exception {
         ByteBuf msgWithEntryMeta =
                 Commands.addBrokerEntryMetadata(headerAndPayloads, 
getBrokerEntryMetadataInterceptors(), 1);
-        byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
+        byte[] byteMessage = new byte[msgWithEntryMeta.readableBytes()];
+        msgWithEntryMeta.readBytes(byteMessage);
         msgWithEntryMeta.release();
         return byteMessage;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 0863e1ec5c4..a22e090451e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -47,9 +47,11 @@ import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.ReferenceCountUtil;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
@@ -171,6 +173,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     private EventLoopGroup eventLoopGroup;
     private ManagedLedgerFactory managedLedgerFactory;
+    private ChannelHandlerContext ctx;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
@@ -208,7 +211,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
                 .when(serverCnx).getCommandSender();
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        ctx = mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
 
         eventLoopGroup = new DefaultEventLoopGroup();
@@ -2274,16 +2277,26 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testSendProducerTxnPrechecks() throws Exception {
         PersistentTopic topic = mock(PersistentTopic.class);
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            Object msg = invocation.getArgument(0);
+            ReferenceCountUtil.safeRelease(msg);
+            latch.countDown();
+            return mock(ChannelFuture.class);
+        }).when(ctx).writeAndFlush(any(), any());
         String role = "appid1";
         Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
                 role, false, null, SchemaVersion.Latest, 0, true,
                 ProducerAccessMode.Shared, Optional.empty(), true);
         producer1.close(false).get();
+        ByteBuf headersAndPayload = Unpooled.wrappedBuffer("test".getBytes());
         producer1.publishTxnMessage(
                 new TxnID(1L, 0L),
-                1, 1, 1, null, 1, false, false
+                1, 1, 1, headersAndPayload, 1, false, false
         );
         verify(topic, times(0)).publishTxnMessage(any(), any(), any());
+        // wait for the writeAndFlush to be called so that ByteBuf leak isn't 
reported
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 15060dceb2a..b10c0023d06 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import javax.naming.AuthenticationException;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -96,9 +97,11 @@ import 
org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
 import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -117,7 +120,6 @@ import 
org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
-import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -332,7 +334,7 @@ public class ServerCnxTest {
 
         assertEquals(serverCnx.getState(), State.Connected);
         assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
-        channel.finish();
+        channel.finishAndReleaseAll();
     }
 
     @DataProvider(name = "clientVersions")
@@ -3094,7 +3096,7 @@ public class ServerCnxTest {
         Object response = getResponse();
         assertTrue(response instanceof CommandSuccess);
 
-        channel.finish();
+        channel.finishAndReleaseAll();
     }
 
     @Test(timeOut = 30000)
@@ -3399,21 +3401,42 @@ public class ServerCnxTest {
     }
 
     @Test
-    public void testHandleAuthResponseWithoutClientVersion() {
-        ServerCnx cnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
-        CommandAuthResponse authResponse = mock(CommandAuthResponse.class);
-        org.apache.pulsar.common.api.proto.AuthData authData = 
mock(org.apache.pulsar.common.api.proto.AuthData.class);
-        when(authResponse.getResponse()).thenReturn(authData);
-        when(authResponse.hasResponse()).thenReturn(true);
-        when(authResponse.getResponse().hasAuthMethodName()).thenReturn(true);
-        when(authResponse.getResponse().hasAuthData()).thenReturn(true);
-        when(authResponse.hasClientVersion()).thenReturn(false);
-        try {
-            cnx.handleAuthResponse(authResponse);
-        } catch (Exception ignore) {
-        }
-        verify(authResponse, times(1)).hasClientVersion();
-        verify(authResponse, times(0)).getClientVersion();
+    public void testHandleAuthResponseWithoutClientVersion() throws Exception {
+        resetChannel();
+        // use a dummy authentication provider
+        AuthenticationProvider authenticationProvider = new 
AuthenticationProvider() {
+            @Override
+            public void initialize(ServiceConfiguration config) throws 
IOException {
+
+            }
+
+            @Override
+            public String authenticate(AuthenticationDataSource authData) 
throws AuthenticationException {
+                return "role";
+            }
+
+            @Override
+            public String getAuthMethodName() {
+                return "dummy";
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+        };
+        AuthData clientData = AuthData.of(new byte[0]);
+        AuthenticationState authenticationState =
+                authenticationProvider.newAuthState(clientData, null, null);
+        // inject the AuthenticationState instance so that auth response can 
be processed
+        serverCnx.setAuthState(authenticationState);
+        // send the auth response with no client version
+        String clientVersion = null;
+        ByteBuf authResponse =
+                Commands.newAuthResponse("token", clientData, 
Commands.getCurrentProtocolVersion(), clientVersion);
+        channel.writeInbound(authResponse);
+        CommandConnected response = (CommandConnected) getResponse();
+        assertNotNull(response);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
index 101bd1b21b4..1b253df0f37 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -37,6 +39,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -46,6 +49,8 @@ public class SharedConsumerAssignorTest {
     private final ConsumerSelector roundRobinConsumerSelector = new 
ConsumerSelector();
     private final List<EntryAndMetadata> entryAndMetadataList = new 
ArrayList<>();
     private final List<EntryAndMetadata> replayQueue = new ArrayList<>();
+    private final Queue<EntryAndMetadata> cleanupQueue = new 
LinkedBlockingDeque<>();
+
     private SharedConsumerAssignor assignor;
 
     @BeforeMethod
@@ -77,6 +82,16 @@ public class SharedConsumerAssignorTest {
         // P.S. In the table above, The uuid represents the 
"<producer-name>-<sequence-id>" for non-chunks
         assertEquals(toString(entryAndMetadataList), Arrays.asList(
                 "0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0", 
"0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2"));
+        entryAndMetadataList.forEach(entry -> 
assertEquals(entry.getDataBuffer().refCnt(), 1));
+        cleanupQueue.addAll(entryAndMetadataList);
+    }
+
+    @AfterMethod
+    public void releaseEntries() {
+        EntryAndMetadata entry;
+        while ((entry = cleanupQueue.poll()) != null) {
+            entry.release();
+        }
     }
 
     @Test
@@ -199,9 +214,13 @@ public class SharedConsumerAssignorTest {
 
     private static EntryAndMetadata createEntryAndMetadata(final long entryId,
                                                            final 
MessageMetadata metadata) {
-        final ByteBuf payload = Commands.serializeMetadataAndPayload(
-                Commands.ChecksumType.Crc32c, metadata, 
PulsarByteBufAllocator.DEFAULT.buffer());
-        return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload));
+        ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer();
+        final ByteBuf data = Commands.serializeMetadataAndPayload(
+                Commands.ChecksumType.Crc32c, metadata, payload);
+        payload.release();
+        EntryAndMetadata entryAndMetadata = 
EntryAndMetadata.create(EntryImpl.create(0L, entryId, data));
+        data.release();
+        return entryAndMetadata;
     }
 
     private static MessageMetadata createMetadata(final String producerName,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 342e44b962c..64866537d29 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -35,6 +35,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.Map;
@@ -99,20 +100,24 @@ public class MessageDuplicationTest extends BrokerTestBase 
{
 
         MessageDeduplication.MessageDupStatus status = 
messageDeduplication.isDuplicate(publishContext1, byteBuf1);
         assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
+        byteBuf1.release();
 
         Long lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 0);
 
         status = messageDeduplication.isDuplicate(publishContext2, byteBuf2);
+        byteBuf2.release();
         assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName2);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
 
+
         byteBuf1 = getMessage(producerName1, 1);
         publishContext1 = getPublishContext(producerName1, 1);
         status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+        byteBuf1.release();
         assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
@@ -121,6 +126,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 5);
         publishContext1 = getPublishContext(producerName1, 5);
         status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+        byteBuf1.release();
         assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
@@ -129,6 +135,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 0);
         publishContext1 = getPublishContext(producerName1, 0);
         status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+        byteBuf1.release();
         // should expect unknown because highestSequencePersisted is empty
         assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
@@ -141,6 +148,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 0);
         publishContext1 = getPublishContext(producerName1, 0);
         status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+        byteBuf1.release();
         // now that highestSequencedPersisted, message with seqId of zero can 
be classified as a dup
         assertEquals(status, MessageDeduplication.MessageDupStatus.Dup);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
@@ -160,6 +168,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
 
         publishContext1 = getPublishContext(producerName1, 4, 8);
         status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+        byteBuf1.release();
         assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
@@ -296,9 +305,12 @@ public class MessageDuplicationTest extends BrokerTestBase 
{
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName2);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
+        byteBuf2.release();
+
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName2);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
+        byteBuf1.release();
 
         byteBuf1 = getMessage(producerName1, 1);
         publishContext1 = getPublishContext(producerName1, 1);
@@ -311,6 +323,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
+        byteBuf1.release();
 
         byteBuf1 = getMessage(producerName1, 5);
         publishContext1 = getPublishContext(producerName1, 5);
@@ -323,6 +336,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 5);
+        byteBuf1.release();
 
         // publish dup
         byteBuf1 = getMessage(producerName1, 0);
@@ -333,6 +347,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 5);
         verify(publishContext1, times(1)).completed(eq(null), eq(-1L), 
eq(-1L));
+        byteBuf1.release();
 
         // publish message unknown dup status
         byteBuf1 = getMessage(producerName1, 6);
@@ -346,6 +361,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 5);
+        byteBuf1.release();
 
         // publish same message again
         byteBuf1 = getMessage(producerName1, 6);
@@ -356,6 +372,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
 
         // complete seq 6 message eventually
         persistentTopic.addComplete(PositionFactory.create(0, 5), null, 
publishContext1);
+        byteBuf1.release();
 
         // simulate failure
         byteBuf1 = getMessage(producerName1, 7);
@@ -376,6 +393,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName2);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
         verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed();
+        byteBuf1.release();
 
         // try dup
         byteBuf1 = getMessage(producerName1, 6);
@@ -386,6 +404,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 6);
+        byteBuf1.release();
 
         // try new message
         byteBuf1 = getMessage(producerName1, 8);
@@ -399,6 +418,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPersisted.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 8);
+        byteBuf1.release();
 
     }
 
@@ -408,8 +428,11 @@ public class MessageDuplicationTest extends BrokerTestBase 
{
                 .setSequenceId(seqId)
                 .setPublishTime(System.currentTimeMillis());
 
-        return serializeMetadataAndPayload(
-                Commands.ChecksumType.Crc32c, messageMetadata, 
io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));
+        ByteBuf payload = Unpooled.copiedBuffer(new byte[0]);
+        ByteBuf byteBuf = serializeMetadataAndPayload(
+                Commands.ChecksumType.Crc32c, messageMetadata, payload);
+        payload.release();
+        return byteBuf;
     }
 
     public Topic.PublishContext getPublishContext(String producerName, long 
seqId) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index fa409832fc1..25179c63c61 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
+import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,6 +38,7 @@ import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest
 import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -49,6 +51,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
     private ServiceConfiguration conf;
     private ReplicatedSubscriptionsController controller;
     private List<ByteBuf> markers;
+    private List<ByteBuf> releaseQueue;
 
     @BeforeMethod
     public void setup() {
@@ -59,6 +62,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3);
 
         markers = new ArrayList<>();
+        releaseQueue = new ArrayList<>();
 
         controller = mock(ReplicatedSubscriptionsController.class);
         when(controller.localCluster()).thenReturn(localCluster);
@@ -72,6 +76,16 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
                 .writeMarker(any(ByteBuf.class));
     }
 
+    @AfterMethod
+    public void tearDown() {
+        if (markers != null) {
+            markers.forEach(ByteBuf::release);
+        }
+        if (releaseQueue != null) {
+            releaseQueue.forEach(ByteBuf::release);
+        }
+    }
+
     @Test
     public void testBuildSnapshotWith2Clusters() throws Exception {
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(controller,
@@ -84,8 +98,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
 
         // Should have sent out a marker to initiate the snapshot
         assertEquals(markers.size(), 1);
-        ReplicatedSubscriptionsSnapshotRequest request = Markers
-                
.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        ReplicatedSubscriptionsSnapshotRequest request = 
parseReplicatedSubscriptionsSnapshotRequest();
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Simulate the responses coming back
@@ -100,7 +113,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
 
         // At this point the snapshot should be created
         assertEquals(markers.size(), 1);
-        ReplicatedSubscriptionsSnapshot snapshot = 
Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+        ReplicatedSubscriptionsSnapshot snapshot = 
parseReplicatedSubscriptionsSnapshot() ;
         assertEquals(snapshot.getClustersCount(), 1);
         assertEquals(snapshot.getClusterAt(0).getCluster(), "b");
         assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(), 
11);
@@ -110,6 +123,19 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         assertEquals(snapshot.getLocalMessageId().getEntryId(), 1);
     }
 
+    private ReplicatedSubscriptionsSnapshotRequest 
parseReplicatedSubscriptionsSnapshotRequest()
+            throws IOException {
+        ByteBuf byteBuf = markers.remove(0);
+        releaseQueue.add(byteBuf);
+        return Markers.parseReplicatedSubscriptionsSnapshotRequest(byteBuf);
+    }
+
+    private ReplicatedSubscriptionsSnapshot 
parseReplicatedSubscriptionsSnapshot() throws IOException {
+        ByteBuf byteBuf = markers.remove(0);
+        releaseQueue.add(byteBuf);
+        return Markers.parseReplicatedSubscriptionsSnapshot(byteBuf);
+    }
+
     @Test
     public void testBuildSnapshotWith3Clusters() throws Exception {
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(controller,
@@ -122,8 +148,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
 
         // Should have sent out a marker to initiate the snapshot
         assertEquals(markers.size(), 1);
-        ReplicatedSubscriptionsSnapshotRequest request = Markers
-                
.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        ReplicatedSubscriptionsSnapshotRequest request = 
parseReplicatedSubscriptionsSnapshotRequest();
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Simulate the responses coming back
@@ -150,7 +175,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
 
         // Since we have 2 remote clusters, a 2nd round of snapshot will be 
taken
         assertEquals(markers.size(), 1);
-        request = 
Markers.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        request = parseReplicatedSubscriptionsSnapshotRequest();
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Responses coming back
@@ -177,7 +202,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
 
         // At this point the snapshot should be created
         assertEquals(markers.size(), 1);
-        ReplicatedSubscriptionsSnapshot snapshot = 
Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+        ReplicatedSubscriptionsSnapshot snapshot = 
parseReplicatedSubscriptionsSnapshot();
         assertEquals(snapshot.getClustersCount(), 2);
         assertEquals(snapshot.getClusterAt(0).getCluster(), "b");
         assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(), 
11);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index bbbdcd74a4d..d7aa97589d5 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -52,7 +52,6 @@ public class CommandsTest {
                 .setSequenceId(sequenceId);
         int expectedChecksum = computeChecksum(messageMetadata, data);
         ByteBufPair clientCommand = Commands.newSend(1, 0, 1, 
ChecksumType.Crc32c, messageMetadata, data);
-        clientCommand.retain();
         ByteBuf receivedBuf = ByteBufPair.coalesce(clientCommand);
         System.err.println(ByteBufUtil.prettyHexDump(receivedBuf));
         receivedBuf.skipBytes(4); //skip [total-size]
@@ -78,7 +77,7 @@ public class CommandsTest {
         metadata = Commands.parseMessageMetadata(receivedBuf);
         // verify metadata parsing
         assertEquals(metadata.getProducerName(), producerName);
-
+        receivedBuf.release();
     }
 
     private int computeChecksum(MessageMetadata msgMetadata, ByteBuf 
compressedPayload) throws IOException {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 402696ea2af..6f4adddaf41 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -139,6 +139,8 @@ public class CompressorCodecTest {
         ByteBuf uncompressed = codec.decode(compressed, 0);
 
         assertEquals(uncompressed, Unpooled.EMPTY_BUFFER);
+        compressed.release();
+        uncompressed.release();
     }
 
     @Test(dataProvider = "codecAndText")
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index 4523a5cc97b..38f70858138 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -18,23 +18,18 @@
  */
 package org.apache.pulsar.common.protocol;
 
-import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-
 import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.CommandProducer;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.testng.Assert;
@@ -129,6 +124,7 @@ public class CommandUtilsTests {
         byte[] content = new byte[b1.readableBytes() + b2.readableBytes()];
         b3.readBytes(content);
         assertEquals(HEAD + TAIL, new String(content, StandardCharsets.UTF_8));
+        b3.release();
     }
 
     @Test
@@ -149,6 +145,7 @@ public class CommandUtilsTests {
         byte [] content = new 
byte[dataWithBrokerEntryMetadata.readableBytes()];
         dataWithBrokerEntryMetadata.readBytes(content);
         assertTrue(new String(content, StandardCharsets.UTF_8).endsWith(data));
+        dataWithBrokerEntryMetadata.release();
     }
 
     @Test
@@ -158,13 +155,13 @@ public class CommandUtilsTests {
         byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
         ByteBuf dataWithBrokerEntryMetadata =
                 Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors(), 11);
-
         Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
         assertEquals(data.length(), 
dataWithBrokerEntryMetadata.readableBytes());
 
         byte [] content = new 
byte[dataWithBrokerEntryMetadata.readableBytes()];
         dataWithBrokerEntryMetadata.readBytes(content);
         assertEquals(new String(content, StandardCharsets.UTF_8), data);
+        dataWithBrokerEntryMetadata.release();
     }
 
     @Test
@@ -185,6 +182,7 @@ public class CommandUtilsTests {
         byte [] content = new 
byte[dataWithBrokerEntryMetadata.readableBytes()];
         dataWithBrokerEntryMetadata.readBytes(content);
         assertEquals(new String(content, StandardCharsets.UTF_8), data);
+        dataWithBrokerEntryMetadata.release();
     }
 
     @Test
@@ -217,6 +215,7 @@ public class CommandUtilsTests {
         byte [] content = new 
byte[dataWithBrokerEntryMetadata.readableBytes()];
         dataWithBrokerEntryMetadata.readBytes(content);
         assertEquals(new String(content, StandardCharsets.UTF_8), data);
+        dataWithBrokerEntryMetadata.release();
     }
 
     public Set<BrokerEntryMetadataInterceptor> 
getBrokerEntryMetadataInterceptors() {
@@ -226,14 +225,4 @@ public class CommandUtilsTests {
         return 
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
                 Thread.currentThread().getContextClassLoader());
     }
-
-
-    public ByteBuf getMessage(String producerName, long seqId) {
-        MessageMetadata messageMetadata = new MessageMetadata()
-                .setProducerName(producerName).setSequenceId(seqId)
-                .setPublishTime(System.currentTimeMillis());
-
-        return serializeMetadataAndPayload(
-                Commands.ChecksumType.Crc32c, messageMetadata, 
io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));
-    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index fe8a393b114..e1baf73497e 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -47,6 +47,7 @@ public class MarkersTest {
 
         assertEquals(request.getSnapshotId(), "sid");
         assertEquals(request.getSourceCluster(), "us-west");
+        buf.release();
     }
 
     @Test
@@ -63,6 +64,7 @@ public class MarkersTest {
         assertEquals(response.getCluster().getCluster(), "us-east");
         assertEquals(response.getCluster().getMessageId().getLedgerId(), 5);
         assertEquals(response.getCluster().getMessageId().getEntryId(), 7);
+        buf.release();
     }
 
     @Test
@@ -91,6 +93,7 @@ public class MarkersTest {
         assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east");
         assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(), 
10);
         assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11);
+        buf.release();
     }
 
     @Test
@@ -115,6 +118,7 @@ public class MarkersTest {
         assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east");
         assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(), 
10);
         assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11);
+        buf.release();
     }
 
     @Test
@@ -130,6 +134,7 @@ public class MarkersTest {
         assertEquals(msgMetadata.getSequenceId(), sequenceId);
         assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
         assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+        buf.release();
     }
 
     @Test
@@ -146,6 +151,7 @@ public class MarkersTest {
         assertEquals(msgMetadata.getSequenceId(), sequenceId);
         assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
         assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+        buf.release();
     }
 
 }


Reply via email to