This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e038e0441e7 [fix][broker] Stop to retry to read entries if the 
replicator has terminated (#24880)
e038e0441e7 is described below

commit e038e0441e7e398c26f5b3dfa82099534f978b32
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 24 16:10:41 2025 +0800

    [fix][broker] Stop to retry to read entries if the replicator has 
terminated (#24880)
---
 .../service/persistent/PersistentReplicator.java   | 12 ++++++-
 .../PersistentReplicatorInflightTaskTest.java      | 41 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 559b39e0fb4..bda5acce236 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -279,6 +279,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (state.equals(Terminated) || state.equals(Terminating)) {
+            return;
+        }
         // Acquire permits and check state of producer.
         InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
         if (newInFlightTask == null) {
@@ -963,7 +966,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     protected boolean hasPendingRead() {
         synchronized (inFlightTasks) {
             for (InFlightTask task : inFlightTasks) {
-                if (task.readPos != null && task.entries == null) {
+                // The purpose of calling "getReadPos" instead of calling 
"readPos" is to make the test
+                // "testReplicationTaskStoppedAfterTopicClosed" can counter 
the calling times of "readMoreEntries".
+                if (task.getReadPos() != null && task.entries == null) {
                     // Skip the current reading if there is a pending cursor 
reading.
                     return true;
                 }
@@ -971,4 +976,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         }
         return false;
     }
+
+    @VisibleForTesting
+    String getReplicatorId() {
+        return  replicatorId;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index 68f50aa9e87..b23b4565e5f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -34,6 +39,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
 import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
 import org.apache.pulsar.client.api.MessageId;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -66,6 +73,40 @@ public class PersistentReplicatorInflightTaskTest extends 
OneWayReplicatorTestBa
         admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
     }
 
+    @Test
+    public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
+        // Close a topic, which has enabled replication.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topicName);
+        waitReplicatorStarted(topicName, pulsar2);
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false)
+                .join().get();
+        PersistentReplicator replicator = (PersistentReplicator) 
topic.getReplicators().get(cluster2);
+        admin1.topics().unload(topicName);
+
+        // Inject a task into the "inFlightTasks" to calculate how many times 
the method "replicator.readMoreEntries"
+        // has been called.
+        AtomicInteger counter = new AtomicInteger();
+        InFlightTask injectedTask = new InFlightTask(PositionFactory.create(1, 
1), 1, replicator.getReplicatorId());
+        injectedTask.setEntries(Collections.emptyList());
+        InFlightTask spyTask = spy(injectedTask);
+        replicator.inFlightTasks.add(spyTask);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                counter.incrementAndGet();
+                return invocationOnMock.callRealMethod();
+            }
+        }).when(spyTask).getReadPos();
+
+        // Verify: there is no scheduled task to retry to read entries to 
replicate.
+        // Call "readMoreEntries" to make the issue happen.
+        replicator.readMoreEntries();
+        Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
+        assertEquals(replicator.getState(), 
AbstractReplicator.State.Terminated);
+        assertTrue(counter.get() <= 1);
+    }
+
     @Test
     public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
         log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

Reply via email to