This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 669c6426f8e [fix][test] Fix multiple ByteBuf leaks in tests (#24281)
669c6426f8e is described below
commit 669c6426f8e0b7aa3e839c59e59e4686651b4476
Author: Lari Hotari <[email protected]>
AuthorDate: Thu May 8 22:32:30 2025 +0300
[fix][test] Fix multiple ByteBuf leaks in tests (#24281)
---
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 +++
.../apache/pulsar/broker/service/ServerCnx.java | 5 ++
.../ManagedLedgerInterceptorImplTest.java | 8 ++-
.../broker/service/AbstractBaseDispatcherTest.java | 17 +++++--
.../service/PersistentMessageFinderTest.java | 6 ++-
.../pulsar/broker/service/PersistentTopicTest.java | 17 ++++++-
.../pulsar/broker/service/ServerCnxTest.java | 59 +++++++++++++++-------
.../broker/service/SharedConsumerAssignorTest.java | 25 +++++++--
.../service/persistent/MessageDuplicationTest.java | 27 +++++++++-
...ReplicatedSubscriptionsSnapshotBuilderTest.java | 39 +++++++++++---
.../pulsar/common/compression/CommandsTest.java | 3 +-
.../common/compression/CompressorCodecTest.java | 2 +
.../pulsar/common/protocol/CommandUtilsTests.java | 21 ++------
.../apache/pulsar/common/protocol/MarkersTest.java | 6 +++
14 files changed, 184 insertions(+), 58 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bcaed2d7322..ce8b0334226 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2899,6 +2899,9 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(),
markDeletedPosition.getEntryId() + 7,
ByteBufAllocator.DEFAULT.buffer(0));
List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3,
entry4, entry5);
+ // release data buffers since EntryImpl.create will retain the buffer
+ entries.forEach(entry -> entry.getDataBuffer().release());
+
c1.trimDeletedEntries(entries);
assertEquals(entries.size(), 1);
assertEquals(entries.get(0).getPosition(),
PositionFactory.create(markDeletedPosition.getLedgerId(),
@@ -2908,6 +2911,9 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(entry2.refCnt(), 0);
assertEquals(entry3.refCnt(), 0);
assertEquals(entry4.refCnt(), 0);
+
+ // release remaining entry
+ entries.forEach(Entry::release);
}
@Test(timeOut = 20000)
@@ -5286,6 +5292,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
+ factory.shutdown();
factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setLedgerForceRecovery(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4cc8ae178c3..f9605449b1f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3778,4 +3778,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
public void decrementThrottleCount() {
throttleTracker.decrementThrottleCount();
}
+
+ @VisibleForTesting
+ void setAuthState(AuthenticationState authState) {
+ this.authState = authState;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
index c9eb8b6ef7c..8d50d91a1ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
@@ -124,10 +124,12 @@ public class ManagedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase
assertEquals(19, ((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex());
List<Entry> entryList = cursor.readEntries(numberOfEntries);
for (int i = 0 ; i < numberOfEntries; i ++) {
+ Entry entry = entryList.get(i);
BrokerEntryMetadata metadata =
-
Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer());
+
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
assertNotNull(metadata);
assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1);
+ entry.release();
}
cursor.close();
@@ -151,7 +153,9 @@ public class ManagedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase
ledger.addEntry("Test Message".getBytes());
factory.getEntryCacheManager().clear();
List<Entry> entryList = cursor.readEntries(1);
- String message = new String(entryList.get(0).getData());
+ Entry entry = entryList.get(0);
+ String message = new String(entry.getData());
+ entry.release();
Assert.assertTrue(message.equals("Test Message"));
cursor.close();
ledger.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 23f7278b254..0d27b8714ba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -108,7 +108,9 @@ public class AbstractBaseDispatcherTest {
List<Entry> entries = new ArrayList<>();
- Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+ ByteBuf message = createMessage("message1", 1);
+ Entry e = EntryImpl.create(1, 2, message);
+ message.release();
long expectedBytePermits = e.getLength();
entries.add(e);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
@@ -124,7 +126,9 @@ public class AbstractBaseDispatcherTest {
@Test
public void testFilterEntriesForConsumerOfTxnMsgAbort() {
List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1",
1)));
+ ByteBuf message = createTnxAbortMessage("message1", 1);
+ entries.add(EntryImpl.create(1, 1, message));
+ message.release();
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -140,7 +144,9 @@ public class AbstractBaseDispatcherTest {
when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true);
List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
+ ByteBuf message = createTnxMessage("message1", 1);
+ entries.add(EntryImpl.create(1, 1, message));
+ message.release();
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -154,6 +160,7 @@ public class AbstractBaseDispatcherTest {
ByteBuf markerMessage =
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId",
"testSourceCluster");
entries.add(EntryImpl.create(1, 1, markerMessage));
+ markerMessage.release();
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -164,7 +171,9 @@ public class AbstractBaseDispatcherTest {
@Test
public void testFilterEntriesForConsumerOfDelayedMsg() {
List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1",
1)));
+ ByteBuf message = createDelayedMessage("message1", 1);
+ entries.add(EntryImpl.create(1, 1, message));
+ message.release();
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index bab21fbc001..f69eb1be432 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -98,7 +98,8 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(headers);
ByteBuf headersAndPayload =
ByteBufPair.coalesce(ByteBufPair.get(headers, data));
- byte[] byteMessage = headersAndPayload.nioBuffer().array();
+ byte[] byteMessage = new byte[headersAndPayload.readableBytes()];
+ headersAndPayload.readBytes(byteMessage);
headersAndPayload.release();
return byteMessage;
}
@@ -123,7 +124,8 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads)
throws Exception {
ByteBuf msgWithEntryMeta =
Commands.addBrokerEntryMetadata(headerAndPayloads,
getBrokerEntryMetadataInterceptors(), 1);
- byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
+ byte[] byteMessage = new byte[msgWithEntryMeta.readableBytes()];
+ msgWithEntryMeta.readBytes(byteMessage);
msgWithEntryMeta.release();
return byteMessage;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 0863e1ec5c4..a22e090451e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -47,9 +47,11 @@ import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
+import io.netty.util.ReferenceCountUtil;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
@@ -171,6 +173,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
private EventLoopGroup eventLoopGroup;
private ManagedLedgerFactory managedLedgerFactory;
+ private ChannelHandlerContext ctx;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
@@ -208,7 +211,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
- ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
eventLoopGroup = new DefaultEventLoopGroup();
@@ -2274,16 +2277,26 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testSendProducerTxnPrechecks() throws Exception {
PersistentTopic topic = mock(PersistentTopic.class);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ Object msg = invocation.getArgument(0);
+ ReferenceCountUtil.safeRelease(msg);
+ latch.countDown();
+ return mock(ChannelFuture.class);
+ }).when(ctx).writeAndFlush(any(), any());
String role = "appid1";
Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
role, false, null, SchemaVersion.Latest, 0, true,
ProducerAccessMode.Shared, Optional.empty(), true);
producer1.close(false).get();
+ ByteBuf headersAndPayload = Unpooled.wrappedBuffer("test".getBytes());
producer1.publishTxnMessage(
new TxnID(1L, 0L),
- 1, 1, 1, null, 1, false, false
+ 1, 1, 1, headersAndPayload, 1, false, false
);
verify(topic, times(0)).publishTxnMessage(any(), any(), any());
+ // wait for the writeAndFlush to be called so that ByteBuf leak isn't
reported
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 15060dceb2a..b10c0023d06 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.naming.AuthenticationException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -96,9 +97,11 @@ import
org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -117,7 +120,6 @@ import
org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
-import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -332,7 +334,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Connected);
assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
- channel.finish();
+ channel.finishAndReleaseAll();
}
@DataProvider(name = "clientVersions")
@@ -3094,7 +3096,7 @@ public class ServerCnxTest {
Object response = getResponse();
assertTrue(response instanceof CommandSuccess);
- channel.finish();
+ channel.finishAndReleaseAll();
}
@Test(timeOut = 30000)
@@ -3399,21 +3401,42 @@ public class ServerCnxTest {
}
@Test
- public void testHandleAuthResponseWithoutClientVersion() {
- ServerCnx cnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
- CommandAuthResponse authResponse = mock(CommandAuthResponse.class);
- org.apache.pulsar.common.api.proto.AuthData authData =
mock(org.apache.pulsar.common.api.proto.AuthData.class);
- when(authResponse.getResponse()).thenReturn(authData);
- when(authResponse.hasResponse()).thenReturn(true);
- when(authResponse.getResponse().hasAuthMethodName()).thenReturn(true);
- when(authResponse.getResponse().hasAuthData()).thenReturn(true);
- when(authResponse.hasClientVersion()).thenReturn(false);
- try {
- cnx.handleAuthResponse(authResponse);
- } catch (Exception ignore) {
- }
- verify(authResponse, times(1)).hasClientVersion();
- verify(authResponse, times(0)).getClientVersion();
+ public void testHandleAuthResponseWithoutClientVersion() throws Exception {
+ resetChannel();
+ // use a dummy authentication provider
+ AuthenticationProvider authenticationProvider = new
AuthenticationProvider() {
+ @Override
+ public void initialize(ServiceConfiguration config) throws
IOException {
+
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData)
throws AuthenticationException {
+ return "role";
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "dummy";
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ AuthData clientData = AuthData.of(new byte[0]);
+ AuthenticationState authenticationState =
+ authenticationProvider.newAuthState(clientData, null, null);
+ // inject the AuthenticationState instance so that auth response can
be processed
+ serverCnx.setAuthState(authenticationState);
+ // send the auth response with no client version
+ String clientVersion = null;
+ ByteBuf authResponse =
+ Commands.newAuthResponse("token", clientData,
Commands.getCurrentProtocolVersion(), clientVersion);
+ channel.writeInbound(authResponse);
+ CommandConnected response = (CommandConnected) getResponse();
+ assertNotNull(response);
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
index 101bd1b21b4..1b253df0f37 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -37,6 +39,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -46,6 +49,8 @@ public class SharedConsumerAssignorTest {
private final ConsumerSelector roundRobinConsumerSelector = new
ConsumerSelector();
private final List<EntryAndMetadata> entryAndMetadataList = new
ArrayList<>();
private final List<EntryAndMetadata> replayQueue = new ArrayList<>();
+ private final Queue<EntryAndMetadata> cleanupQueue = new
LinkedBlockingDeque<>();
+
private SharedConsumerAssignor assignor;
@BeforeMethod
@@ -77,6 +82,16 @@ public class SharedConsumerAssignorTest {
// P.S. In the table above, The uuid represents the
"<producer-name>-<sequence-id>" for non-chunks
assertEquals(toString(entryAndMetadataList), Arrays.asList(
"0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0",
"0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2"));
+ entryAndMetadataList.forEach(entry ->
assertEquals(entry.getDataBuffer().refCnt(), 1));
+ cleanupQueue.addAll(entryAndMetadataList);
+ }
+
+ @AfterMethod
+ public void releaseEntries() {
+ EntryAndMetadata entry;
+ while ((entry = cleanupQueue.poll()) != null) {
+ entry.release();
+ }
}
@Test
@@ -199,9 +214,13 @@ public class SharedConsumerAssignorTest {
private static EntryAndMetadata createEntryAndMetadata(final long entryId,
final
MessageMetadata metadata) {
- final ByteBuf payload = Commands.serializeMetadataAndPayload(
- Commands.ChecksumType.Crc32c, metadata,
PulsarByteBufAllocator.DEFAULT.buffer());
- return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload));
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer();
+ final ByteBuf data = Commands.serializeMetadataAndPayload(
+ Commands.ChecksumType.Crc32c, metadata, payload);
+ payload.release();
+ EntryAndMetadata entryAndMetadata =
EntryAndMetadata.create(EntryImpl.create(0L, entryId, data));
+ data.release();
+ return entryAndMetadata;
}
private static MessageMetadata createMetadata(final String producerName,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 342e44b962c..64866537d29 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -35,6 +35,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
@@ -99,20 +100,24 @@ public class MessageDuplicationTest extends BrokerTestBase
{
MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
+ byteBuf1.release();
Long lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
status = messageDeduplication.isDuplicate(publishContext2, byteBuf2);
+ byteBuf2.release();
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
+
byteBuf1 = getMessage(producerName1, 1);
publishContext1 = getPublishContext(producerName1, 1);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+ byteBuf1.release();
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
@@ -121,6 +126,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 5);
publishContext1 = getPublishContext(producerName1, 5);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+ byteBuf1.release();
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
@@ -129,6 +135,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+ byteBuf1.release();
// should expect unknown because highestSequencePersisted is empty
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
@@ -141,6 +148,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+ byteBuf1.release();
// now that highestSequencedPersisted, message with seqId of zero can
be classified as a dup
assertEquals(status, MessageDeduplication.MessageDupStatus.Dup);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
@@ -160,6 +168,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
publishContext1 = getPublishContext(producerName1, 4, 8);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
+ byteBuf1.release();
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
@@ -296,9 +305,12 @@ public class MessageDuplicationTest extends BrokerTestBase
{
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
+ byteBuf2.release();
+
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
+ byteBuf1.release();
byteBuf1 = getMessage(producerName1, 1);
publishContext1 = getPublishContext(producerName1, 1);
@@ -311,6 +323,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
+ byteBuf1.release();
byteBuf1 = getMessage(producerName1, 5);
publishContext1 = getPublishContext(producerName1, 5);
@@ -323,6 +336,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
+ byteBuf1.release();
// publish dup
byteBuf1 = getMessage(producerName1, 0);
@@ -333,6 +347,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
verify(publishContext1, times(1)).completed(eq(null), eq(-1L),
eq(-1L));
+ byteBuf1.release();
// publish message unknown dup status
byteBuf1 = getMessage(producerName1, 6);
@@ -346,6 +361,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
+ byteBuf1.release();
// publish same message again
byteBuf1 = getMessage(producerName1, 6);
@@ -356,6 +372,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
// complete seq 6 message eventually
persistentTopic.addComplete(PositionFactory.create(0, 5), null,
publishContext1);
+ byteBuf1.release();
// simulate failure
byteBuf1 = getMessage(producerName1, 7);
@@ -376,6 +393,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName2);
assertEquals(lastSequenceIdPushed.longValue(), 1);
verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed();
+ byteBuf1.release();
// try dup
byteBuf1 = getMessage(producerName1, 6);
@@ -386,6 +404,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 6);
+ byteBuf1.release();
// try new message
byteBuf1 = getMessage(producerName1, 8);
@@ -399,6 +418,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
lastSequenceIdPushed =
messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 8);
+ byteBuf1.release();
}
@@ -408,8 +428,11 @@ public class MessageDuplicationTest extends BrokerTestBase
{
.setSequenceId(seqId)
.setPublishTime(System.currentTimeMillis());
- return serializeMetadataAndPayload(
- Commands.ChecksumType.Crc32c, messageMetadata,
io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));
+ ByteBuf payload = Unpooled.copiedBuffer(new byte[0]);
+ ByteBuf byteBuf = serializeMetadataAndPayload(
+ Commands.ChecksumType.Crc32c, messageMetadata, payload);
+ payload.release();
+ return byteBuf;
}
public Topic.PublishContext getPublishContext(String producerName, long
seqId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index fa409832fc1..25179c63c61 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
@@ -37,6 +38,7 @@ import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest
import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -49,6 +51,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
private ServiceConfiguration conf;
private ReplicatedSubscriptionsController controller;
private List<ByteBuf> markers;
+ private List<ByteBuf> releaseQueue;
@BeforeMethod
public void setup() {
@@ -59,6 +62,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3);
markers = new ArrayList<>();
+ releaseQueue = new ArrayList<>();
controller = mock(ReplicatedSubscriptionsController.class);
when(controller.localCluster()).thenReturn(localCluster);
@@ -72,6 +76,16 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
.writeMarker(any(ByteBuf.class));
}
+ @AfterMethod
+ public void tearDown() {
+ if (markers != null) {
+ markers.forEach(ByteBuf::release);
+ }
+ if (releaseQueue != null) {
+ releaseQueue.forEach(ByteBuf::release);
+ }
+ }
+
@Test
public void testBuildSnapshotWith2Clusters() throws Exception {
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(controller,
@@ -84,8 +98,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// Should have sent out a marker to initiate the snapshot
assertEquals(markers.size(), 1);
- ReplicatedSubscriptionsSnapshotRequest request = Markers
-
.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ ReplicatedSubscriptionsSnapshotRequest request =
parseReplicatedSubscriptionsSnapshotRequest();
assertEquals(request.getSourceCluster(), localCluster);
// Simulate the responses coming back
@@ -100,7 +113,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// At this point the snapshot should be created
assertEquals(markers.size(), 1);
- ReplicatedSubscriptionsSnapshot snapshot =
Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+ ReplicatedSubscriptionsSnapshot snapshot =
parseReplicatedSubscriptionsSnapshot() ;
assertEquals(snapshot.getClustersCount(), 1);
assertEquals(snapshot.getClusterAt(0).getCluster(), "b");
assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(),
11);
@@ -110,6 +123,19 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
assertEquals(snapshot.getLocalMessageId().getEntryId(), 1);
}
+ private ReplicatedSubscriptionsSnapshotRequest
parseReplicatedSubscriptionsSnapshotRequest()
+ throws IOException {
+ ByteBuf byteBuf = markers.remove(0);
+ releaseQueue.add(byteBuf);
+ return Markers.parseReplicatedSubscriptionsSnapshotRequest(byteBuf);
+ }
+
+ private ReplicatedSubscriptionsSnapshot
parseReplicatedSubscriptionsSnapshot() throws IOException {
+ ByteBuf byteBuf = markers.remove(0);
+ releaseQueue.add(byteBuf);
+ return Markers.parseReplicatedSubscriptionsSnapshot(byteBuf);
+ }
+
@Test
public void testBuildSnapshotWith3Clusters() throws Exception {
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(controller,
@@ -122,8 +148,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// Should have sent out a marker to initiate the snapshot
assertEquals(markers.size(), 1);
- ReplicatedSubscriptionsSnapshotRequest request = Markers
-
.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ ReplicatedSubscriptionsSnapshotRequest request =
parseReplicatedSubscriptionsSnapshotRequest();
assertEquals(request.getSourceCluster(), localCluster);
// Simulate the responses coming back
@@ -150,7 +175,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// Since we have 2 remote clusters, a 2nd round of snapshot will be
taken
assertEquals(markers.size(), 1);
- request =
Markers.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ request = parseReplicatedSubscriptionsSnapshotRequest();
assertEquals(request.getSourceCluster(), localCluster);
// Responses coming back
@@ -177,7 +202,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// At this point the snapshot should be created
assertEquals(markers.size(), 1);
- ReplicatedSubscriptionsSnapshot snapshot =
Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+ ReplicatedSubscriptionsSnapshot snapshot =
parseReplicatedSubscriptionsSnapshot();
assertEquals(snapshot.getClustersCount(), 2);
assertEquals(snapshot.getClusterAt(0).getCluster(), "b");
assertEquals(snapshot.getClusterAt(0).getMessageId().getLedgerId(),
11);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index bbbdcd74a4d..d7aa97589d5 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -52,7 +52,6 @@ public class CommandsTest {
.setSequenceId(sequenceId);
int expectedChecksum = computeChecksum(messageMetadata, data);
ByteBufPair clientCommand = Commands.newSend(1, 0, 1,
ChecksumType.Crc32c, messageMetadata, data);
- clientCommand.retain();
ByteBuf receivedBuf = ByteBufPair.coalesce(clientCommand);
System.err.println(ByteBufUtil.prettyHexDump(receivedBuf));
receivedBuf.skipBytes(4); //skip [total-size]
@@ -78,7 +77,7 @@ public class CommandsTest {
metadata = Commands.parseMessageMetadata(receivedBuf);
// verify metadata parsing
assertEquals(metadata.getProducerName(), producerName);
-
+ receivedBuf.release();
}
private int computeChecksum(MessageMetadata msgMetadata, ByteBuf
compressedPayload) throws IOException {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 402696ea2af..6f4adddaf41 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -139,6 +139,8 @@ public class CompressorCodecTest {
ByteBuf uncompressed = codec.decode(compressed, 0);
assertEquals(uncompressed, Unpooled.EMPTY_BUFFER);
+ compressed.release();
+ uncompressed.release();
}
@Test(dataProvider = "codecAndText")
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index 4523a5cc97b..38f70858138 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -18,23 +18,18 @@
*/
package org.apache.pulsar.common.protocol;
-import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.testng.Assert;
@@ -129,6 +124,7 @@ public class CommandUtilsTests {
byte[] content = new byte[b1.readableBytes() + b2.readableBytes()];
b3.readBytes(content);
assertEquals(HEAD + TAIL, new String(content, StandardCharsets.UTF_8));
+ b3.release();
}
@Test
@@ -149,6 +145,7 @@ public class CommandUtilsTests {
byte [] content = new
byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertTrue(new String(content, StandardCharsets.UTF_8).endsWith(data));
+ dataWithBrokerEntryMetadata.release();
}
@Test
@@ -158,13 +155,13 @@ public class CommandUtilsTests {
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
ByteBuf dataWithBrokerEntryMetadata =
Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors(), 11);
-
Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
assertEquals(data.length(),
dataWithBrokerEntryMetadata.readableBytes());
byte [] content = new
byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertEquals(new String(content, StandardCharsets.UTF_8), data);
+ dataWithBrokerEntryMetadata.release();
}
@Test
@@ -185,6 +182,7 @@ public class CommandUtilsTests {
byte [] content = new
byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertEquals(new String(content, StandardCharsets.UTF_8), data);
+ dataWithBrokerEntryMetadata.release();
}
@Test
@@ -217,6 +215,7 @@ public class CommandUtilsTests {
byte [] content = new
byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertEquals(new String(content, StandardCharsets.UTF_8), data);
+ dataWithBrokerEntryMetadata.release();
}
public Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
@@ -226,14 +225,4 @@ public class CommandUtilsTests {
return
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
Thread.currentThread().getContextClassLoader());
}
-
-
- public ByteBuf getMessage(String producerName, long seqId) {
- MessageMetadata messageMetadata = new MessageMetadata()
- .setProducerName(producerName).setSequenceId(seqId)
- .setPublishTime(System.currentTimeMillis());
-
- return serializeMetadataAndPayload(
- Commands.ChecksumType.Crc32c, messageMetadata,
io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));
- }
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index fe8a393b114..e1baf73497e 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -47,6 +47,7 @@ public class MarkersTest {
assertEquals(request.getSnapshotId(), "sid");
assertEquals(request.getSourceCluster(), "us-west");
+ buf.release();
}
@Test
@@ -63,6 +64,7 @@ public class MarkersTest {
assertEquals(response.getCluster().getCluster(), "us-east");
assertEquals(response.getCluster().getMessageId().getLedgerId(), 5);
assertEquals(response.getCluster().getMessageId().getEntryId(), 7);
+ buf.release();
}
@Test
@@ -91,6 +93,7 @@ public class MarkersTest {
assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east");
assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(),
10);
assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11);
+ buf.release();
}
@Test
@@ -115,6 +118,7 @@ public class MarkersTest {
assertEquals(snapshot.getClusterAt(1).getCluster(), "us-east");
assertEquals(snapshot.getClusterAt(1).getMessageId().getLedgerId(),
10);
assertEquals(snapshot.getClusterAt(1).getMessageId().getEntryId(), 11);
+ buf.release();
}
@Test
@@ -130,6 +134,7 @@ public class MarkersTest {
assertEquals(msgMetadata.getSequenceId(), sequenceId);
assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+ buf.release();
}
@Test
@@ -146,6 +151,7 @@ public class MarkersTest {
assertEquals(msgMetadata.getSequenceId(), sequenceId);
assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+ buf.release();
}
}