This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e038e0441e7 [fix][broker] Stop to retry to read entries if the
replicator has terminated (#24880)
e038e0441e7 is described below
commit e038e0441e7e398c26f5b3dfa82099534f978b32
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 24 16:10:41 2025 +0800
[fix][broker] Stop to retry to read entries if the replicator has
terminated (#24880)
---
.../service/persistent/PersistentReplicator.java | 12 ++++++-
.../PersistentReplicatorInflightTaskTest.java | 41 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 559b39e0fb4..bda5acce236 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -279,6 +279,9 @@ public abstract class PersistentReplicator extends
AbstractReplicator
}
protected void readMoreEntries() {
+ if (state.equals(Terminated) || state.equals(Terminating)) {
+ return;
+ }
// Acquire permits and check state of producer.
InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
if (newInFlightTask == null) {
@@ -963,7 +966,9 @@ public abstract class PersistentReplicator extends
AbstractReplicator
protected boolean hasPendingRead() {
synchronized (inFlightTasks) {
for (InFlightTask task : inFlightTasks) {
- if (task.readPos != null && task.entries == null) {
+ // The purpose of calling "getReadPos" instead of calling
"readPos" is to make the test
+ // "testReplicationTaskStoppedAfterTopicClosed" can counter
the calling times of "readMoreEntries".
+ if (task.getReadPos() != null && task.entries == null) {
// Skip the current reading if there is a pending cursor
reading.
return true;
}
@@ -971,4 +976,9 @@ public abstract class PersistentReplicator extends
AbstractReplicator
}
return false;
}
+
+ @VisibleForTesting
+ String getReplicatorId() {
+ return replicatorId;
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index 68f50aa9e87..b23b4565e5f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -18,12 +18,17 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
@@ -34,6 +39,8 @@ import
org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
import
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
import org.apache.pulsar.client.api.MessageId;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -66,6 +73,40 @@ public class PersistentReplicatorInflightTaskTest extends
OneWayReplicatorTestBa
admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
}
+ @Test
+ public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
+ // Close a topic, which has enabled replication.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topicName);
+ waitReplicatorStarted(topicName, pulsar2);
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false)
+ .join().get();
+ PersistentReplicator replicator = (PersistentReplicator)
topic.getReplicators().get(cluster2);
+ admin1.topics().unload(topicName);
+
+ // Inject a task into the "inFlightTasks" to calculate how many times
the method "replicator.readMoreEntries"
+ // has been called.
+ AtomicInteger counter = new AtomicInteger();
+ InFlightTask injectedTask = new InFlightTask(PositionFactory.create(1,
1), 1, replicator.getReplicatorId());
+ injectedTask.setEntries(Collections.emptyList());
+ InFlightTask spyTask = spy(injectedTask);
+ replicator.inFlightTasks.add(spyTask);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ counter.incrementAndGet();
+ return invocationOnMock.callRealMethod();
+ }
+ }).when(spyTask).getReadPos();
+
+ // Verify: there is no scheduled task to retry to read entries to
replicate.
+ // Call "readMoreEntries" to make the issue happen.
+ replicator.readMoreEntries();
+ Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
+ assertEquals(replicator.getState(),
AbstractReplicator.State.Terminated);
+ assertTrue(counter.get() <= 1);
+ }
+
@Test
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");