This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac53a9e52ff [fix][broke] Fix ShadowReplicator source entry buffer leak
(#25732)
ac53a9e52ff is described below
commit ac53a9e52ff8a9215d3e508a94cda0ea87e49cf5
Author: void-ptr974 <[email protected]>
AuthorDate: Thu May 14 15:52:10 2026 +0800
[fix][broke] Fix ShadowReplicator source entry buffer leak (#25732)
---
.../service/persistent/ShadowReplicator.java | 2 -
.../service/persistent/ShadowReplicatorTest.java | 61 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index 667c8598529..5d6f6f688cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -134,8 +134,6 @@ public class ShadowReplicator extends PersistentReplicator {
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
.setValue(String.format("%s:%s", entry.getLedgerId(),
entry.getEntryId()));
- headersAndPayload.retain();
-
// Increment pending messages for messages produced locally
producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg, inFlightTask));
atLeastOneMessageSentForReplication = true;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 40ff0633f4f..21ead24cdd4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -19,14 +19,20 @@
package org.apache.pulsar.broker.service.persistent;
import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -38,9 +44,11 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -146,6 +154,39 @@ public class ShadowReplicatorTest extends BrokerTestBase {
Assert.assertEquals(shadowMessage.getMessageId(),
sourceMessage.getMessageId());
}
+ @Test
+ public void testShadowReplicatorReleasesSourceEntryBuffer() throws
Exception {
+ String sourceTopicName =
BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic");
+ String shadowTopicName =
BrokerTestUtil.newUniqueName("persistent://prop1/ns-shadow/shadow-topic");
+
+ admin.topics().createNonPartitionedTopic(sourceTopicName);
+ admin.topics().createShadowTopic(shadowTopicName, sourceTopicName);
+ admin.topics().setShadowTopics(sourceTopicName,
Lists.newArrayList(shadowTopicName));
+
+ PersistentTopic sourceTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get();
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 1));
+ ShadowReplicator replicator = (ShadowReplicator)
sourceTopic.getShadowReplicators().get(shadowTopicName);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(String.valueOf(replicator.getState()),
"Started"));
+
+ Entry entry = createEntry(1, 0, "ref-count-check", 1);
+ ByteBuf entryBuffer = entry.getDataBuffer();
+ Assert.assertEquals(entryBuffer.refCnt(), 1);
+
+ List<Entry> entries = Lists.newArrayList(entry);
+ PersistentReplicator.InFlightTask inFlightTask =
+ new PersistentReplicator.InFlightTask(
+ entry.getPosition(), entries.size(),
replicator.getReplicatorId());
+ inFlightTask.setEntries(entries);
+ Assert.assertTrue(replicator.replicateEntries(entries, inFlightTask));
+
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(inFlightTask.isDone());
+ Assert.assertEquals(entryBuffer.refCnt(), 0);
+ });
+ }
+
private static PersistentReplicator getAnyShadowReplicator(TopicName
topicName, PulsarService pulsar) {
PersistentTopic persistentTopic =
(PersistentTopic)
pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get();
@@ -196,4 +237,22 @@ public class ShadowReplicatorTest extends BrokerTestBase {
ensureNoBacklogByInflightTask(replicator);
}
-}
\ No newline at end of file
+ private Entry createEntry(long ledgerId, long entryId, String message,
long sequenceId) {
+ ByteBuf headersAndPayload = createMessage(message, sequenceId);
+ Entry entry = EntryImpl.create(ledgerId, entryId, headersAndPayload);
+ headersAndPayload.release();
+ return entry;
+ }
+
+ private ByteBuf createMessage(String message, long sequenceId) {
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(sequenceId)
+ .setProducerName("testProducer")
+ .setPublishTime(System.currentTimeMillis());
+ ByteBuf payload =
Unpooled.copiedBuffer(message.getBytes(StandardCharsets.UTF_8));
+ ByteBuf headersAndPayload =
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
payload);
+ payload.release();
+ return headersAndPayload;
+ }
+
+}