This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f9f5b49d63 [improve][broker] Prevent stale replicator pending reads 
after termination (#25767)
8f9f5b49d63 is described below

commit 8f9f5b49d631e235e86d79e48a63722e74db4413
Author: sinan liu <[email protected]>
AuthorDate: Mon May 18 09:49:50 2026 +0800

    [improve][broker] Prevent stale replicator pending reads after termination 
(#25767)
---
 .../service/persistent/PersistentReplicator.java   | 24 +++++++--
 .../PersistentReplicatorInflightTaskTest.java      | 60 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index bd68d2fb980..8433d9b9c7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -494,7 +494,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        InFlightTask inFlightTask = (InFlightTask) ctx;
+        completeFailedReadTask(ctx);
         if (state != Started) {
             log.info("Replicator was disconnected while reading entries, 
stopping reads");
             return;
@@ -515,14 +515,12 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             terminate();
             return;
         } else if (!(exception instanceof TooManyRequestsException)) {
-            inFlightTask.setEntries(Collections.emptyList());
             log.error()
                     .attr("ctx", ctx)
                     .attr("waitTimeSec", waitTimeMillis / 1000.0)
                     .exception(exception)
                     .log("Error reading entries, retrying");
         } else {
-            inFlightTask.setEntries(Collections.emptyList());
             log.debug()
                     .attr("ctx", ctx)
                     .attr("waitTimeSec", waitTimeMillis / 1000.0)
@@ -533,6 +531,24 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         brokerService.executor().schedule(this::readMoreEntries, 
waitTimeMillis, TimeUnit.MILLISECONDS);
     }
 
+    private void completeFailedReadTask(Object ctx) {
+        if (!(ctx instanceof InFlightTask)) {
+            log.error()
+                    .attr("ctx", ctx)
+                    .log("Unexpected read entries failed context");
+            return;
+        }
+
+        InFlightTask inFlightTask = (InFlightTask) ctx;
+        if (inFlightTask.entries == null) {
+            inFlightTask.setEntries(Collections.emptyList());
+        } else {
+            log.error()
+                    .attr("inFlightTask", inFlightTask)
+                    .log("Unexpected completed in-flight task in read entries 
failed callback");
+        }
+    }
+
     public CompletableFuture<Void> clearBacklog() {
         CompletableFuture<Void> future = new CompletableFuture<>();
 
@@ -991,4 +1007,4 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     String getReplicatorId() {
         return  replicatorId;
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index 0499dd90e99..478437ffde0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -28,17 +28,25 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.CustomLog;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
 import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
@@ -107,6 +115,58 @@ public class PersistentReplicatorInflightTaskTest extends 
OneWayReplicatorTestBa
         assertTrue(counter.get() <= 1);
     }
 
+    @Test
+    public void 
testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated() throws 
Exception {
+        String topicName = BrokerTestUtil.newUniqueName("persistent://" + 
nonReplicatedNamespace + "/tp_");
+        CountDownLatch readStarted = new CountDownLatch(1);
+        CountDownLatch failRead = new CountDownLatch(1);
+        Producer<String> producer = null;
+        try {
+            admin1.topics().createNonPartitionedTopic(topicName);
+            admin2.topics().createNonPartitionedTopic(topicName);
+            producer = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+            producer.send("msg");
+
+            PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false)
+                    .join().get();
+            ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            ManagedLedgerTest.makeReadEntryProbFail(ml, () -> {
+                readStarted.countDown();
+                try {
+                    if (!failRead.await(30, TimeUnit.SECONDS)) {
+                        return new ManagedLedgerException("Timed out waiting 
to fail read entries");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return new ManagedLedgerException(e);
+                }
+                return new 
ManagedLedgerException.TooManyRequestsException("mocked read failure");
+            });
+
+            pulsar1.getConfig().setReplicationStartAt("earliest");
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+            assertTrue(readStarted.await(30, TimeUnit.SECONDS));
+
+            PersistentReplicator replicator = (PersistentReplicator) 
topic.getReplicators().get(cluster2);
+            Assert.assertNotNull(replicator, "Replicator should not be null");
+            Assert.assertTrue(replicator.hasPendingRead());
+
+            replicator.terminate();
+            Assert.assertTrue(replicator.getState() != 
AbstractReplicator.State.Started);
+            failRead.countDown();
+
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                    Assert.assertFalse(replicator.hasPendingRead()));
+        } finally {
+            failRead.countDown();
+            if (producer != null) {
+                producer.close();
+            }
+            admin1.topics().delete(topicName, true);
+            admin2.topics().delete(topicName, true);
+        }
+    }
+
     @Test
     public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
         log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

Reply via email to