This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac53a9e52ff [fix][broke] Fix ShadowReplicator source entry buffer leak 
(#25732)
ac53a9e52ff is described below

commit ac53a9e52ff8a9215d3e508a94cda0ea87e49cf5
Author: void-ptr974 <[email protected]>
AuthorDate: Thu May 14 15:52:10 2026 +0800

    [fix][broke] Fix ShadowReplicator source entry buffer leak (#25732)
---
 .../service/persistent/ShadowReplicator.java       |  2 -
 .../service/persistent/ShadowReplicatorTest.java   | 61 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index 667c8598529..5d6f6f688cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -134,8 +134,6 @@ public class ShadowReplicator extends PersistentReplicator {
                 
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
                         .setValue(String.format("%s:%s", entry.getLedgerId(), 
entry.getEntryId()));
 
-                headersAndPayload.retain();
-
                 // Increment pending messages for messages produced locally
                 producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg, inFlightTask));
                 atLeastOneMessageSentForReplication = true;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 40ff0633f4f..21ead24cdd4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -19,14 +19,20 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -38,9 +44,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -146,6 +154,39 @@ public class ShadowReplicatorTest extends BrokerTestBase {
         Assert.assertEquals(shadowMessage.getMessageId(), 
sourceMessage.getMessageId());
     }
 
+    @Test
+    public void testShadowReplicatorReleasesSourceEntryBuffer() throws 
Exception {
+        String sourceTopicName = 
BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic");
+        String shadowTopicName = 
BrokerTestUtil.newUniqueName("persistent://prop1/ns-shadow/shadow-topic");
+
+        admin.topics().createNonPartitionedTopic(sourceTopicName);
+        admin.topics().createShadowTopic(shadowTopicName, sourceTopicName);
+        admin.topics().setShadowTopics(sourceTopicName, 
Lists.newArrayList(shadowTopicName));
+
+        PersistentTopic sourceTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get();
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 1));
+        ShadowReplicator replicator = (ShadowReplicator) 
sourceTopic.getShadowReplicators().get(shadowTopicName);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(String.valueOf(replicator.getState()), 
"Started"));
+
+        Entry entry = createEntry(1, 0, "ref-count-check", 1);
+        ByteBuf entryBuffer = entry.getDataBuffer();
+        Assert.assertEquals(entryBuffer.refCnt(), 1);
+
+        List<Entry> entries = Lists.newArrayList(entry);
+        PersistentReplicator.InFlightTask inFlightTask =
+                new PersistentReplicator.InFlightTask(
+                        entry.getPosition(), entries.size(), 
replicator.getReplicatorId());
+        inFlightTask.setEntries(entries);
+        Assert.assertTrue(replicator.replicateEntries(entries, inFlightTask));
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(inFlightTask.isDone());
+            Assert.assertEquals(entryBuffer.refCnt(), 0);
+        });
+    }
+
     private static PersistentReplicator getAnyShadowReplicator(TopicName 
topicName, PulsarService pulsar) {
         PersistentTopic persistentTopic =
                 (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get();
@@ -196,4 +237,22 @@ public class ShadowReplicatorTest extends BrokerTestBase {
         ensureNoBacklogByInflightTask(replicator);
     }
 
-}
\ No newline at end of file
+    private Entry createEntry(long ledgerId, long entryId, String message, 
long sequenceId) {
+        ByteBuf headersAndPayload = createMessage(message, sequenceId);
+        Entry entry = EntryImpl.create(ledgerId, entryId, headersAndPayload);
+        headersAndPayload.release();
+        return entry;
+    }
+
+    private ByteBuf createMessage(String message, long sequenceId) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf payload = 
Unpooled.copiedBuffer(message.getBytes(StandardCharsets.UTF_8));
+        ByteBuf headersAndPayload = 
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, 
payload);
+        payload.release();
+        return headersAndPayload;
+    }
+
+}

Reply via email to