This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit aeda5ca1b28e7dceaeb21772420eb0fe6b7d001b Author: sinan liu <[email protected]> AuthorDate: Mon May 18 09:49:50 2026 +0800 [improve][broker] Prevent stale replicator pending reads after termination (#25767) (cherry picked from commit 8f9f5b49d631e235e86d79e48a63722e74db4413) --- .../service/persistent/PersistentReplicator.java | 21 ++++++-- .../PersistentReplicatorInflightTaskTest.java | 60 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index fc338c02e4d..ecb88818b05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -496,7 +496,7 @@ public abstract class PersistentReplicator extends AbstractReplicator @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - InFlightTask inFlightTask = (InFlightTask) ctx; + completeFailedReadTask(ctx); if (state != Started) { log.info("[{}] Replicator was disconnected while reading entries." + " Stop reading. Replicator state: {}", @@ -517,11 +517,9 @@ public abstract class PersistentReplicator extends AbstractReplicator terminate(); return; } else if (!(exception instanceof TooManyRequestsException)) { - inFlightTask.setEntries(Collections.emptyList()); log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", replicatorId, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); } else { - inFlightTask.setEntries(Collections.emptyList()); log.debug("[{}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", replicatorId, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); @@ -530,6 +528,21 @@ public abstract class PersistentReplicator extends AbstractReplicator brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); } + private void completeFailedReadTask(Object ctx) { + if (!(ctx instanceof InFlightTask)) { + log.error("[{}] Unexpected read entries failed context {}", replicatorId, ctx); + return; + } + + InFlightTask inFlightTask = (InFlightTask) ctx; + if (inFlightTask.entries == null) { + inFlightTask.setEntries(Collections.emptyList()); + } else { + log.error("[{}] Unexpected completed in-flight task in read entries failed callback. inFlightTask={}", + replicatorId, inFlightTask); + } + } + public CompletableFuture<Void> clearBacklog() { CompletableFuture<Void> future = new CompletableFuture<>(); @@ -985,4 +998,4 @@ public abstract class PersistentReplicator extends AbstractReplicator String getReplicatorId() { return replicatorId; } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index e8a7a11a2db..c825d49caa6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -28,17 +28,25 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker; import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -107,6 +115,58 @@ public class PersistentReplicatorInflightTaskTest extends OneWayReplicatorTestBa assertTrue(counter.get() <= 1); } + @Test + public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated() throws Exception { + String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + CountDownLatch readStarted = new CountDownLatch(1); + CountDownLatch failRead = new CountDownLatch(1); + Producer<String> producer = null; + try { + admin1.topics().createNonPartitionedTopic(topicName); + admin2.topics().createNonPartitionedTopic(topicName); + producer = client1.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("msg"); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false) + .join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedLedgerTest.makeReadEntryProbFail(ml, () -> { + readStarted.countDown(); + try { + if (!failRead.await(30, TimeUnit.SECONDS)) { + return new ManagedLedgerException("Timed out waiting to fail read entries"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return new ManagedLedgerException(e); + } + return new ManagedLedgerException.TooManyRequestsException("mocked read failure"); + }); + + pulsar1.getConfig().setReplicationStartAt("earliest"); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + assertTrue(readStarted.await(30, TimeUnit.SECONDS)); + + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2); + Assert.assertNotNull(replicator, "Replicator should not be null"); + Assert.assertTrue(replicator.hasPendingRead()); + + replicator.terminate(); + Assert.assertTrue(replicator.getState() != AbstractReplicator.State.Started); + failRead.countDown(); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertFalse(replicator.hasPendingRead())); + } finally { + failRead.countDown(); + if (producer != null) { + producer.close(); + } + admin1.topics().delete(topicName, true); + admin2.topics().delete(topicName, true); + } + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");
