This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new efc9299dcd6 [fix][broker]Replication is stuck because failed to read 
entries (#25625)
efc9299dcd6 is described below

commit efc9299dcd67630373316a9689e06e43bbabe0fc
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 6 00:42:40 2026 +0800

    [fix][broker]Replication is stuck because failed to read entries (#25625)
---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 21 ++++++-
 .../service/persistent/PersistentReplicator.java   |  3 +
 .../broker/service/OneWayReplicatorTest.java       | 72 ++++++++++++++++++++++
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |  6 ++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  6 ++
 5 files changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7e756f1d994..ad2a2a79d3f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -87,6 +88,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.CustomLog;
 import lombok.Data;
@@ -167,7 +169,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
-    private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean 
addEntryFinished) throws Exception {
+    public static void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean 
addEntryFinished) throws Exception {
         LedgerHandle currentLedger = ml.currentLedger;
         final LedgerHandle spyLedgerHandle = spy(currentLedger);
         doAnswer(invocation -> {
@@ -183,6 +185,23 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ml.currentLedger = spyLedgerHandle;
     }
 
+    public static void makeReadEntryProbFail(ManagedLedgerImpl ml, 
Supplier<ManagedLedgerException> errorOrNot)
+            throws Exception {
+        ml.entryCache.clear();
+        LedgerHandle currentLedger = ml.currentLedger;
+        final LedgerHandle spyLedgerHandle = spy(currentLedger);
+        doAnswer(invocation -> {
+            long ledgerId = (long) invocation.getArguments()[0];
+            long entryId = (long) invocation.getArguments()[1];
+            ManagedLedgerException mightError = errorOrNot.get();
+            if (mightError != null) {
+                return CompletableFuture.failedFuture(mightError);
+            }
+            return currentLedger.readUnconfirmedAsync(ledgerId, entryId);
+        }).when(spyLedgerHandle).readUnconfirmedAsync(anyLong(), anyLong());
+        ml.currentLedger = spyLedgerHandle;
+    }
+
     @Data
     private static class DeleteLedgerInfo{
         volatile boolean hasCalled;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0a6caea067c..bd68d2fb980 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -494,6 +494,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+        InFlightTask inFlightTask = (InFlightTask) ctx;
         if (state != Started) {
             log.info("Replicator was disconnected while reading entries, 
stopping reads");
             return;
@@ -514,12 +515,14 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             terminate();
             return;
         } else if (!(exception instanceof TooManyRequestsException)) {
+            inFlightTask.setEntries(Collections.emptyList());
             log.error()
                     .attr("ctx", ctx)
                     .attr("waitTimeSec", waitTimeMillis / 1000.0)
                     .exception(exception)
                     .log("Error reading entries, retrying");
         } else {
+            inFlightTask.setEntries(Collections.emptyList());
             log.debug()
                     .attr("ctx", ctx)
                     .attr("waitTimeSec", waitTimeMillis / 1000.0)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b0b12bd8f2c..0645877c144 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -76,6 +76,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.resources.ClusterResources;
@@ -648,6 +649,77 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
+    @Test(timeOut = 45 * 1000)
+    public void testProbBKErrorWhenReplicating() throws Exception {
+        // creates topics.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String subscription = "s1";
+        final int totalMsg = 10_000;
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createNonPartitionedTopic(topicName);
+        RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
+        admin1.topicPolicies().setRetention(topicName, retentionPolicies);
+        admin2.topicPolicies().setRetention(topicName, retentionPolicies);
+        PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName, 
false).join().get();
+        ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+        PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName, 
false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            HierarchyTopicPolicies policies1 = 
topic1.getHierarchyTopicPolicies();
+            HierarchyTopicPolicies policies2 = 
topic2.getHierarchyTopicPolicies();
+            
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
 10);
+            
assertEquals(policies2.getRetentionPolicies().get().getRetentionTimeInMinutes(),
 10);
+        });
+        // Publishes messages.
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        Set<String> msgPublished = new HashSet<>();
+        for (int i = 0; i < totalMsg; i++) {
+            msgPublished.add("msg" + i);
+            producer1.send("msg" + i);
+        }
+
+        // Inject a probable error.
+        AtomicInteger roundrobin = new  AtomicInteger();
+        Supplier<ManagedLedgerException> bkErrorOrNot = () -> {
+            if (roundrobin.incrementAndGet() % 2 == 0) {
+                return null;
+            }
+            return new ManagedLedgerException.TooManyRequestsException("mocked 
error");
+        };
+        ManagedLedgerTest.makeReadEntryProbFail(ml1, bkErrorOrNot);
+
+        // Verify: the replication will finish even though received 
ManagedLedgerException.TooManyRequestsException.
+        pulsar1.getConfig().setReplicationStartAt("earliest");
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        waitReplicatorStarted(topicName);
+        
Awaitility.await().atMost(Duration.ofSeconds(600)).pollInterval(Duration.ofSeconds(1)).untilAsserted(()
 -> {
+            TopicStats topicStats = admin1.topics().getStats(topicName);
+            
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 
0);
+        });
+
+        // Verify: messages were replicated.
+        admin2.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+        Set<String> received = new HashSet<>();
+        Consumer<String> consumer2 = client2.newConsumer(Schema.STRING)
+                .subscriptionName(subscription).topic(topicName).subscribe();
+        while (true) {
+            Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            received.add(msg.getValue());
+        }
+        assertEquals(received.size(), msgPublished.size());
+        assertEquals(received, msgPublished);
+
+        // cleanup.
+        producer1.close();
+        consumer2.close();
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(topicName, false);
+        admin1.topics().delete(topicName);
+        admin2.topics().delete(topicName);
+    }
+
     /**
      * Since {@link NonPersistentReplicator} never implement the rate 
limitation, the config
      * "replicationProducerQueueSize" should not affect {@link 
NonPersistentReplicator}.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 7f6124e62a7..f828f92f7db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -110,6 +110,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testProbBKErrorWhenReplicating() throws Exception {
+        super.testProbBKErrorWhenReplicating();
+    }
+
     @Override
     @Test(enabled = false)
     public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index de7e0cf0a3e..4db26758beb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -321,6 +321,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testReplicatorProducerStatInTopic();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testProbBKErrorWhenReplicating() throws Exception {
+        super.testProbBKErrorWhenReplicating();
+    }
+
     @Test(enabled = false)
     public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
         super.testReplicatorProducerStatInTopic();

Reply via email to