This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8f9f5b49d63 [improve][broker] Prevent stale replicator pending reads
after termination (#25767)
8f9f5b49d63 is described below
commit 8f9f5b49d631e235e86d79e48a63722e74db4413
Author: sinan liu <[email protected]>
AuthorDate: Mon May 18 09:49:50 2026 +0800
[improve][broker] Prevent stale replicator pending reads after termination
(#25767)
---
.../service/persistent/PersistentReplicator.java | 24 +++++++--
.../PersistentReplicatorInflightTaskTest.java | 60 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index bd68d2fb980..8433d9b9c7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -494,7 +494,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
- InFlightTask inFlightTask = (InFlightTask) ctx;
+ completeFailedReadTask(ctx);
if (state != Started) {
log.info("Replicator was disconnected while reading entries,
stopping reads");
return;
@@ -515,14 +515,12 @@ public abstract class PersistentReplicator extends
AbstractReplicator
terminate();
return;
} else if (!(exception instanceof TooManyRequestsException)) {
- inFlightTask.setEntries(Collections.emptyList());
log.error()
.attr("ctx", ctx)
.attr("waitTimeSec", waitTimeMillis / 1000.0)
.exception(exception)
.log("Error reading entries, retrying");
} else {
- inFlightTask.setEntries(Collections.emptyList());
log.debug()
.attr("ctx", ctx)
.attr("waitTimeSec", waitTimeMillis / 1000.0)
@@ -533,6 +531,24 @@ public abstract class PersistentReplicator extends
AbstractReplicator
brokerService.executor().schedule(this::readMoreEntries,
waitTimeMillis, TimeUnit.MILLISECONDS);
}
+ private void completeFailedReadTask(Object ctx) {
+ if (!(ctx instanceof InFlightTask)) {
+ log.error()
+ .attr("ctx", ctx)
+ .log("Unexpected read entries failed context");
+ return;
+ }
+
+ InFlightTask inFlightTask = (InFlightTask) ctx;
+ if (inFlightTask.entries == null) {
+ inFlightTask.setEntries(Collections.emptyList());
+ } else {
+ log.error()
+ .attr("inFlightTask", inFlightTask)
+ .log("Unexpected completed in-flight task in read entries
failed callback");
+ }
+ }
+
public CompletableFuture<Void> clearBacklog() {
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -991,4 +1007,4 @@ public abstract class PersistentReplicator extends
AbstractReplicator
String getReplicatorId() {
return replicatorId;
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index 0499dd90e99..478437ffde0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -28,17 +28,25 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
import
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
@@ -107,6 +115,58 @@ public class PersistentReplicatorInflightTaskTest extends
OneWayReplicatorTestBa
assertTrue(counter.get() <= 1);
}
+ @Test
+ public void
testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated() throws
Exception {
+ String topicName = BrokerTestUtil.newUniqueName("persistent://" +
nonReplicatedNamespace + "/tp_");
+ CountDownLatch readStarted = new CountDownLatch(1);
+ CountDownLatch failRead = new CountDownLatch(1);
+ Producer<String> producer = null;
+ try {
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ producer =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ producer.send("msg");
+
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false)
+ .join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
topic.getManagedLedger();
+ ManagedLedgerTest.makeReadEntryProbFail(ml, () -> {
+ readStarted.countDown();
+ try {
+ if (!failRead.await(30, TimeUnit.SECONDS)) {
+ return new ManagedLedgerException("Timed out waiting
to fail read entries");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return new ManagedLedgerException(e);
+ }
+ return new
ManagedLedgerException.TooManyRequestsException("mocked read failure");
+ });
+
+ pulsar1.getConfig().setReplicationStartAt("earliest");
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+ assertTrue(readStarted.await(30, TimeUnit.SECONDS));
+
+ PersistentReplicator replicator = (PersistentReplicator)
topic.getReplicators().get(cluster2);
+ Assert.assertNotNull(replicator, "Replicator should not be null");
+ Assert.assertTrue(replicator.hasPendingRead());
+
+ replicator.terminate();
+ Assert.assertTrue(replicator.getState() !=
AbstractReplicator.State.Started);
+ failRead.countDown();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertFalse(replicator.hasPendingRead()));
+ } finally {
+ failRead.countDown();
+ if (producer != null) {
+ producer.close();
+ }
+ admin1.topics().delete(topicName, true);
+ admin2.topics().delete(topicName, true);
+ }
+ }
+
@Test
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");