This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ca02e4c32e06f0c1d136ab41b8d3c98433483922 Author: Lari Hotari <[email protected]> AuthorDate: Thu May 8 22:32:30 2025 +0300 [fix][test] Fix multiple ByteBuf leaks in tests (#24281) (cherry picked from commit 669c6426f8e0b7aa3e839c59e59e4686651b4476) --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 6 +++ .../apache/pulsar/broker/service/ServerCnx.java | 5 ++ .../ManagedLedgerInterceptorImplTest.java | 8 ++- .../broker/service/AbstractBaseDispatcherTest.java | 17 +++++-- .../service/PersistentMessageFinderTest.java | 6 ++- .../pulsar/broker/service/PersistentTopicTest.java | 17 ++++++- .../pulsar/broker/service/ServerCnxTest.java | 59 +++++++++++++++------- .../broker/service/SharedConsumerAssignorTest.java | 25 +++++++-- .../service/persistent/MessageDuplicationTest.java | 27 +++++++++- ...ReplicatedSubscriptionsSnapshotBuilderTest.java | 39 +++++++++++--- .../pulsar/common/compression/CommandsTest.java | 3 +- .../common/compression/CompressorCodecTest.java | 2 + .../pulsar/common/protocol/CommandUtilsTests.java | 21 ++------ .../apache/pulsar/common/protocol/MarkersTest.java | 6 +++ 14 files changed, 183 insertions(+), 58 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index a057232ce38..47497a37617 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2866,6 +2866,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 7, ByteBufAllocator.DEFAULT.buffer(0)); List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5); + // release data buffers since EntryImpl.create will retain the buffer + entries.forEach(entry -> entry.getDataBuffer().release()); + c1.trimDeletedEntries(entries); assertEquals(entries.size(), 1); assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId(), @@ -2875,6 +2878,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(entry2.refCnt(), 0); assertEquals(entry3.refCnt(), 0); assertEquals(entry4.refCnt(), 0); + + // release remaining entry + entries.forEach(Entry::release); } @Test(timeOut = 20000) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index dc35784fd97..423ba729ddd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -3672,4 +3672,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { protected void setAuthRole(String authRole) { this.authRole = authRole; } + + @VisibleForTesting + void setAuthState(AuthenticationState authState) { + this.authState = authState; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java index 8b49cc4bc52..455dcbbf413 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java @@ -129,10 +129,12 @@ public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex()); List<Entry> entryList = cursor.readEntries(numberOfEntries); for (int i = 0 ; i < numberOfEntries; i ++) { + Entry entry = entryList.get(i); BrokerEntryMetadata metadata = - Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer()); + Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); assertNotNull(metadata); assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1); + entry.release(); } cursor.close(); @@ -156,7 +158,9 @@ public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase ledger.addEntry("Test Message".getBytes()); factory.getEntryCacheManager().clear(); List<Entry> entryList = cursor.readEntries(1); - String message = new String(entryList.get(0).getData()); + Entry entry = entryList.get(0); + String message = new String(entry.getData()); + entry.release(); Assert.assertTrue(message.equals("Test Message")); cursor.close(); ledger.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index 332cccc2d2c..fa551dc5f49 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -107,7 +107,9 @@ public class AbstractBaseDispatcherTest { List<Entry> entries = new ArrayList<>(); - Entry e = EntryImpl.create(1, 2, createMessage("message1", 1)); + ByteBuf message = createMessage("message1", 1); + Entry e = EntryImpl.create(1, 2, message); + message.release(); long expectedBytePermits = e.getLength(); entries.add(e); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); @@ -123,7 +125,9 @@ public class AbstractBaseDispatcherTest { @Test public void testFilterEntriesForConsumerOfTxnMsgAbort() { List<Entry> entries = new ArrayList<>(); - entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1", 1))); + ByteBuf message = createTnxAbortMessage("message1", 1); + entries.add(EntryImpl.create(1, 1, message)); + message.release(); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); @@ -139,7 +143,9 @@ public class AbstractBaseDispatcherTest { when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true); List<Entry> entries = new ArrayList<>(); - entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1))); + ByteBuf message = createTnxMessage("message1", 1); + entries.add(EntryImpl.create(1, 1, message)); + message.release(); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); @@ -153,6 +159,7 @@ public class AbstractBaseDispatcherTest { ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster"); entries.add(EntryImpl.create(1, 1, markerMessage)); + markerMessage.release(); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); @@ -163,7 +170,9 @@ public class AbstractBaseDispatcherTest { @Test public void testFilterEntriesForConsumerOfDelayedMsg() { List<Entry> entries = new ArrayList<>(); - entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1", 1))); + ByteBuf message = createDelayedMessage("message1", 1); + entries.add(EntryImpl.create(1, 1, message)); + message.release(); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 8b33672e140..aee9225fc8a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -97,7 +97,8 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { headers.writeInt(msgMetadataSize); messageMetadata.writeTo(headers); ByteBuf headersAndPayload = ByteBufPair.coalesce(ByteBufPair.get(headers, data)); - byte[] byteMessage = headersAndPayload.nioBuffer().array(); + byte[] byteMessage = new byte[headersAndPayload.readableBytes()]; + headersAndPayload.readBytes(byteMessage); headersAndPayload.release(); return byteMessage; } @@ -122,7 +123,8 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception { ByteBuf msgWithEntryMeta = Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors(), 1); - byte[] byteMessage = msgWithEntryMeta.nioBuffer().array(); + byte[] byteMessage = new byte[msgWithEntryMeta.readableBytes()]; + msgWithEntryMeta.readBytes(byteMessage); msgWithEntryMeta.release(); return byteMessage; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 7317629c646..aa07b56d569 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -45,8 +45,10 @@ import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.DefaultEventLoop; +import io.netty.util.ReferenceCountUtil; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetAddress; @@ -163,6 +165,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { private BrokerService brokerService; + private ChannelHandlerContext ctx; @BeforeMethod(alwaysRun = true) public void setup() throws Exception { @@ -199,7 +202,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop(); doReturn(channel).when(ctx).channel(); @@ -2254,16 +2257,26 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { @Test public void testSendProducerTxnPrechecks() throws Exception { PersistentTopic topic = mock(PersistentTopic.class); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + Object msg = invocation.getArgument(0); + ReferenceCountUtil.safeRelease(msg); + latch.countDown(); + return mock(ChannelFuture.class); + }).when(ctx).writeAndFlush(any(), any()); String role = "appid1"; Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false, null, SchemaVersion.Latest, 0, true, ProducerAccessMode.Shared, Optional.empty(), true); producer1.close(false).get(); + ByteBuf headersAndPayload = Unpooled.wrappedBuffer("test".getBytes()); producer1.publishTxnMessage( new TxnID(1L, 0L), - 1, 1, 1, null, 1, false, false + 1, 1, 1, headersAndPayload, 1, false, false ); verify(topic, times(0)).publishTxnMessage(any(), any(), any()); + // wait for the writeAndFlush to be called so that ByteBuf leak isn't reported + assertTrue(latch.await(5, TimeUnit.SECONDS)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 65ac7549a6c..59215ab2316 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import javax.naming.AuthenticationException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -90,6 +91,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider; import org.apache.pulsar.broker.auth.MockAuthorizationProvider; import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.broker.TransactionMetadataStoreService; @@ -97,6 +99,7 @@ import org.apache.pulsar.broker.auth.MockAuthenticationProvider; import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -115,7 +118,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; -import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; @@ -329,7 +331,7 @@ public class ServerCnxTest { assertEquals(serverCnx.getState(), State.Connected); assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy"); - channel.finish(); + channel.finishAndReleaseAll(); } @DataProvider(name = "clientVersions") @@ -3090,7 +3092,7 @@ public class ServerCnxTest { Object response = getResponse(); assertTrue(response instanceof CommandSuccess); - channel.finish(); + channel.finishAndReleaseAll(); } @Test(timeOut = 30000) @@ -3395,21 +3397,42 @@ public class ServerCnxTest { } @Test - public void testHandleAuthResponseWithoutClientVersion() { - ServerCnx cnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - CommandAuthResponse authResponse = mock(CommandAuthResponse.class); - org.apache.pulsar.common.api.proto.AuthData authData = mock(org.apache.pulsar.common.api.proto.AuthData.class); - when(authResponse.getResponse()).thenReturn(authData); - when(authResponse.hasResponse()).thenReturn(true); - when(authResponse.getResponse().hasAuthMethodName()).thenReturn(true); - when(authResponse.getResponse().hasAuthData()).thenReturn(true); - when(authResponse.hasClientVersion()).thenReturn(false); - try { - cnx.handleAuthResponse(authResponse); - } catch (Exception ignore) { - } - verify(authResponse, times(1)).hasClientVersion(); - verify(authResponse, times(0)).getClientVersion(); + public void testHandleAuthResponseWithoutClientVersion() throws Exception { + resetChannel(); + // use a dummy authentication provider + AuthenticationProvider authenticationProvider = new AuthenticationProvider() { + @Override + public void initialize(ServiceConfiguration config) throws IOException { + + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + return "role"; + } + + @Override + public String getAuthMethodName() { + return "dummy"; + } + + @Override + public void close() throws IOException { + + } + }; + AuthData clientData = AuthData.of(new byte[0]); + AuthenticationState authenticationState = + authenticationProvider.newAuthState(clientData, null, null); + // inject the AuthenticationState instance so that auth response can be processed + serverCnx.setAuthState(authenticationState); + // send the auth response with no client version + String clientVersion = null; + ByteBuf authResponse = + Commands.newAuthResponse("token", clientData, Commands.getCurrentProtocolVersion(), clientVersion); + channel.writeInbound(authResponse); + CommandConnected response = (CommandConnected) getResponse(); + assertNotNull(response); } @Test(expectedExceptions = IllegalArgumentException.class) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index 101bd1b21b4..1b253df0f37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -29,6 +29,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -37,6 +39,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -46,6 +49,8 @@ public class SharedConsumerAssignorTest { private final ConsumerSelector roundRobinConsumerSelector = new ConsumerSelector(); private final List<EntryAndMetadata> entryAndMetadataList = new ArrayList<>(); private final List<EntryAndMetadata> replayQueue = new ArrayList<>(); + private final Queue<EntryAndMetadata> cleanupQueue = new LinkedBlockingDeque<>(); + private SharedConsumerAssignor assignor; @BeforeMethod @@ -77,6 +82,16 @@ public class SharedConsumerAssignorTest { // P.S. In the table above, The uuid represents the "<producer-name>-<sequence-id>" for non-chunks assertEquals(toString(entryAndMetadataList), Arrays.asList( "0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0", "0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2")); + entryAndMetadataList.forEach(entry -> assertEquals(entry.getDataBuffer().refCnt(), 1)); + cleanupQueue.addAll(entryAndMetadataList); + } + + @AfterMethod + public void releaseEntries() { + EntryAndMetadata entry; + while ((entry = cleanupQueue.poll()) != null) { + entry.release(); + } } @Test @@ -199,9 +214,13 @@ public class SharedConsumerAssignorTest { private static EntryAndMetadata createEntryAndMetadata(final long entryId, final MessageMetadata metadata) { - final ByteBuf payload = Commands.serializeMetadataAndPayload( - Commands.ChecksumType.Crc32c, metadata, PulsarByteBufAllocator.DEFAULT.buffer()); - return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload)); + ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer(); + final ByteBuf data = Commands.serializeMetadataAndPayload( + Commands.ChecksumType.Crc32c, metadata, payload); + payload.release(); + EntryAndMetadata entryAndMetadata = EntryAndMetadata.create(EntryImpl.create(0L, entryId, data)); + data.release(); + return entryAndMetadata; } private static MessageMetadata createMetadata(final String producerName, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 0f7be405b39..605d863b4c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -34,6 +34,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; @@ -97,20 +98,24 @@ public class MessageDuplicationTest extends BrokerTestBase { MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); + byteBuf1.release(); Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 0); status = messageDeduplication.isDuplicate(publishContext2, byteBuf2); + byteBuf2.release(); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 1); + byteBuf1 = getMessage(producerName1, 1); publishContext1 = getPublishContext(producerName1, 1); status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + byteBuf1.release(); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertNotNull(lastSequenceIdPushed); @@ -119,6 +124,7 @@ public class MessageDuplicationTest extends BrokerTestBase { byteBuf1 = getMessage(producerName1, 5); publishContext1 = getPublishContext(producerName1, 5); status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + byteBuf1.release(); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertNotNull(lastSequenceIdPushed); @@ -127,6 +133,7 @@ public class MessageDuplicationTest extends BrokerTestBase { byteBuf1 = getMessage(producerName1, 0); publishContext1 = getPublishContext(producerName1, 0); status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + byteBuf1.release(); // should expect unknown because highestSequencePersisted is empty assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); @@ -139,6 +146,7 @@ public class MessageDuplicationTest extends BrokerTestBase { byteBuf1 = getMessage(producerName1, 0); publishContext1 = getPublishContext(producerName1, 0); status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + byteBuf1.release(); // now that highestSequencedPersisted, message with seqId of zero can be classified as a dup assertEquals(status, MessageDeduplication.MessageDupStatus.Dup); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); @@ -158,6 +166,7 @@ public class MessageDuplicationTest extends BrokerTestBase { publishContext1 = getPublishContext(producerName1, 4, 8); status = messageDeduplication.isDuplicate(publishContext1, byteBuf1); + byteBuf1.release(); assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown); lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertNotNull(lastSequenceIdPushed); @@ -293,9 +302,12 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 1); + byteBuf2.release(); + lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 1); + byteBuf1.release(); byteBuf1 = getMessage(producerName1, 1); publishContext1 = getPublishContext(producerName1, 1); @@ -308,6 +320,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 1); + byteBuf1.release(); byteBuf1 = getMessage(producerName1, 5); publishContext1 = getPublishContext(producerName1, 5); @@ -320,6 +333,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 5); + byteBuf1.release(); // publish dup byteBuf1 = getMessage(producerName1, 0); @@ -330,6 +344,7 @@ public class MessageDuplicationTest extends BrokerTestBase { assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 5); verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L)); + byteBuf1.release(); // publish message unknown dup status byteBuf1 = getMessage(producerName1, 6); @@ -343,6 +358,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 5); + byteBuf1.release(); // publish same message again byteBuf1 = getMessage(producerName1, 6); @@ -353,6 +369,7 @@ public class MessageDuplicationTest extends BrokerTestBase { // complete seq 6 message eventually persistentTopic.addComplete(new PositionImpl(0, 5), null, publishContext1); + byteBuf1.release(); // simulate failure byteBuf1 = getMessage(producerName1, 7); @@ -373,6 +390,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2); assertEquals(lastSequenceIdPushed.longValue(), 1); verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed(); + byteBuf1.release(); // try dup byteBuf1 = getMessage(producerName1, 6); @@ -383,6 +401,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 6); + byteBuf1.release(); // try new message byteBuf1 = getMessage(producerName1, 8); @@ -396,6 +415,7 @@ public class MessageDuplicationTest extends BrokerTestBase { lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1); assertNotNull(lastSequenceIdPushed); assertEquals(lastSequenceIdPushed.longValue(), 8); + byteBuf1.release(); } @@ -405,8 +425,11 @@ public class MessageDuplicationTest extends BrokerTestBase { .setSequenceId(seqId) .setPublishTime(System.currentTimeMillis()); - return serializeMetadataAndPayload( - Commands.ChecksumType.Crc32c, messageMetadata, io.netty.buffer.Unpooled.copiedBuffer(new byte[0])); + ByteBuf payload = Unpooled.copiedBuffer(new byte[0]); + ByteBuf byteBuf = serializeMetadataAndPayload( + Commands.ChecksumType.Crc32c, messageMetadata, payload); + payload.release(); + return byteBuf; } public Topic.PublishContext getPublishContext(String producerName, long seqId) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java index 4ae923bd244..35577850a51 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java @@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; @@ -41,6 +42,7 @@ import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -53,6 +55,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { private ServiceConfiguration conf; private ReplicatedSubscriptionsController controller; private List<ByteBuf> markers; + private List<ByteBuf> releaseQueue; @BeforeMethod public void setup() { @@ -63,6 +66,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3); markers = new ArrayList<>(); + releaseQueue = new ArrayList<>(); controller = mock(ReplicatedSubscriptionsController.class); when(controller.localCluster()).thenReturn(localCluster); @@ -75,6 +79,16 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { .writeMarker(any(ByteBuf.class)); } + @AfterMethod + public void tearDown() { + if (markers != null) { + markers.forEach(ByteBuf::release); + } + if (releaseQueue != null) { + releaseQueue.forEach(ByteBuf::release); + } + } + @Test public void testBuildSnapshotWith2Clusters() throws Exception { List<String> remoteClusters = Collections.singletonList("b"); @@ -88,8 +102,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { // Should have sent out a marker to initiate the snapshot assertEquals(markers.size(), 1); - ReplicatedSubscriptionsSnapshotRequest request = Markers - .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0)); + ReplicatedSubscriptionsSnapshotRequest request = parseReplicatedSubscriptionsSnapshotRequest(); assertEquals(request.getSourceCluster(), localCluster); // Simulate the responses coming back @@ -104,7 +117,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { // At this point the snapshot should be created assertEquals(markers.size(), 1); - ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0)); + ReplicatedSubscriptionsSnapshot snapshot = parseReplicatedSubscriptionsSnapshot() ; assertEquals(snapshot.getClustersCount(), 1); assertEquals(snapshot.getClusterAt(0).getCluster(), "b"); assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(), 11); @@ -114,6 +127,19 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { assertEquals(snapshot.getLocalMessageId().getEntryId(), 1); } + private ReplicatedSubscriptionsSnapshotRequest parseReplicatedSubscriptionsSnapshotRequest() + throws IOException { + ByteBuf byteBuf = markers.remove(0); + releaseQueue.add(byteBuf); + return Markers.parseReplicatedSubscriptionsSnapshotRequest(byteBuf); + } + + private ReplicatedSubscriptionsSnapshot parseReplicatedSubscriptionsSnapshot() throws IOException { + ByteBuf byteBuf = markers.remove(0); + releaseQueue.add(byteBuf); + return Markers.parseReplicatedSubscriptionsSnapshot(byteBuf); + } + @Test public void testBuildSnapshotWith3Clusters() throws Exception { List<String> remoteClusters = Arrays.asList("b", "c"); @@ -127,8 +153,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { // Should have sent out a marker to initiate the snapshot assertEquals(markers.size(), 1); - ReplicatedSubscriptionsSnapshotRequest request = Markers - .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0)); + ReplicatedSubscriptionsSnapshotRequest request = parseReplicatedSubscriptionsSnapshotRequest(); assertEquals(request.getSourceCluster(), localCluster); // Simulate the responses coming back @@ -155,7 +180,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { // Since we have 2 remote clusters, a 2nd round of snapshot will be taken assertEquals(markers.size(), 1); - request = Markers.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0)); + request = parseReplicatedSubscriptionsSnapshotRequest(); assertEquals(request.getSourceCluster(), localCluster); // Responses coming back @@ -182,7 +207,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest { // At this point the snapshot should be created assertEquals(markers.size(), 1); - ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0)); + ReplicatedSubscriptionsSnapshot snapshot = parseReplicatedSubscriptionsSnapshot(); assertEquals(snapshot.getClustersCount(), 2); assertEquals(snapshot.getClusterAt(0).getCluster(), "b"); assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(), 11); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index 42f1a581002..ffc1081380e 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -52,7 +52,6 @@ public class CommandsTest { .setSequenceId(sequenceId); int expectedChecksum = computeChecksum(messageMetadata, data); ByteBufPair clientCommand = Commands.newSend(1, 0, 1, ChecksumType.Crc32c, messageMetadata, data); - clientCommand.retain(); ByteBuf receivedBuf = ByteBufPair.coalesce(clientCommand); System.err.println(ByteBufUtil.prettyHexDump(receivedBuf)); receivedBuf.skipBytes(4); //skip [total-size] @@ -78,7 +77,7 @@ public class CommandsTest { metadata = Commands.parseMessageMetadata(receivedBuf); // verify metadata parsing assertEquals(metadata.getProducerName(), producerName); - + receivedBuf.release(); } private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java index 402696ea2af..6f4adddaf41 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java @@ -139,6 +139,8 @@ public class CompressorCodecTest { ByteBuf uncompressed = codec.decode(compressed, 0); assertEquals(uncompressed, Unpooled.EMPTY_BUFFER); + compressed.release(); + uncompressed.release(); } @Test(dataProvider = "codecAndText") diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java index 4523a5cc97b..38f70858138 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java @@ -18,23 +18,18 @@ */ package org.apache.pulsar.common.protocol; -import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; - import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Map; import java.util.Set; - import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandProducer; import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.testng.Assert; @@ -129,6 +124,7 @@ public class CommandUtilsTests { byte[] content = new byte[b1.readableBytes() + b2.readableBytes()]; b3.readBytes(content); assertEquals(HEAD + TAIL, new String(content, StandardCharsets.UTF_8)); + b3.release(); } @Test @@ -149,6 +145,7 @@ public class CommandUtilsTests { byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; dataWithBrokerEntryMetadata.readBytes(content); assertTrue(new String(content, StandardCharsets.UTF_8).endsWith(data)); + dataWithBrokerEntryMetadata.release(); } @Test @@ -158,13 +155,13 @@ public class CommandUtilsTests { byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); ByteBuf dataWithBrokerEntryMetadata = Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), 11); - Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; dataWithBrokerEntryMetadata.readBytes(content); assertEquals(new String(content, StandardCharsets.UTF_8), data); + dataWithBrokerEntryMetadata.release(); } @Test @@ -185,6 +182,7 @@ public class CommandUtilsTests { byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; dataWithBrokerEntryMetadata.readBytes(content); assertEquals(new String(content, StandardCharsets.UTF_8), data); + dataWithBrokerEntryMetadata.release(); } @Test @@ -217,6 +215,7 @@ public class CommandUtilsTests { byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; dataWithBrokerEntryMetadata.readBytes(content); assertEquals(new String(content, StandardCharsets.UTF_8), data); + dataWithBrokerEntryMetadata.release(); } public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() { @@ -226,14 +225,4 @@ public class CommandUtilsTests { return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } - - - public ByteBuf getMessage(String producerName, long seqId) { - MessageMetadata messageMetadata = new MessageMetadata() - .setProducerName(producerName).setSequenceId(seqId) - .setPublishTime(System.currentTimeMillis()); - - return serializeMetadataAndPayload( - Commands.ChecksumType.Crc32c, messageMetadata, io.netty.buffer.Unpooled.copiedBuffer(new byte[0])); - } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java index fe8a393b114..e1baf73497e 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java @@ -47,6 +47,7 @@ public class MarkersTest { assertEquals(request.getSnapshotId(), "sid"); assertEquals(request.getSourceCluster(), "us-west"); + buf.release(); } @Test @@ -63,6 +64,7 @@ public class MarkersTest { assertEquals(response.getCluster().getCluster(), "us-east"); assertEquals(response.getCluster().getMessageId().getLedgerId(), 5); assertEquals(response.getCluster().getMessageId().getEntryId(), 7); + buf.release(); } @Test @@ -91,6 +93,7 @@ public class MarkersTest { assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east"); assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(), 10); assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11); + buf.release(); } @Test @@ -115,6 +118,7 @@ public class MarkersTest { assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east"); assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(), 10); assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11); + buf.release(); } @Test @@ -130,6 +134,7 @@ public class MarkersTest { assertEquals(msgMetadata.getSequenceId(), sequenceId); assertEquals(msgMetadata.getTxnidMostBits(), mostBits); assertEquals(msgMetadata.getTxnidLeastBits(), leastBits); + buf.release(); } @Test @@ -146,6 +151,7 @@ public class MarkersTest { assertEquals(msgMetadata.getSequenceId(), sequenceId); assertEquals(msgMetadata.getTxnidMostBits(), mostBits); assertEquals(msgMetadata.getTxnidLeastBits(), leastBits); + buf.release(); } }
