This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d7c767ca6 [fix][test] Fix flaky test 
ReplicatorTest.testResumptionAfterBacklogRelaxed (#25950)
f3d7c767ca6 is described below

commit f3d7c767ca6ca1f2d5017da615225ea7f7528def
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Jun 8 23:11:08 2026 +0800

    [fix][test] Fix flaky test ReplicatorTest.testResumptionAfterBacklogRelaxed 
(#25950)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../pulsar/broker/service/ReplicatorTest.java      | 62 +++++++++++++++++-----
 1 file changed, 48 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9753e68dfb1..10d616a2b0f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -993,7 +994,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
      * @throws Exception
      */
 
-    @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies")
+    @Test(timeOut = 120000, priority = -1, dataProvider = "retentionPolicies")
     public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy) 
throws Exception {
         // create a unique namespace for this test case to avoid flakiness
         String namespace = 
newUniqueName("pulsar/testResumptionAfterBacklogRelaxed");
@@ -1020,12 +1021,20 @@ public class ReplicatorTest extends ReplicatorTestBase {
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
                 .getTopicReference(dest.toString()).get();
         Replicator replicator = topic.getPersistentReplicator("r2");
+        PersistentTopic remoteTopic = (PersistentTopic) 
pulsar2.getBrokerService()
+                .getTopicReference(dest.toString()).get();
 
-        // Produce 1 message in r1. This message will be replicated 
immediately into r2 and it will become part of
-        // local backlog
-        producer1.produce(1);
+        // Produce messages in r1. These messages will be replicated 
immediately into r2 and become
+        // part of the local backlog there.
+        int remoteBacklogMessages = 2;
+        producer1.produce(remoteBacklogMessages);
 
         Awaitility.await().untilAsserted(() -> 
assertEquals(replicator.computeStats().replicationBacklog, 0));
+        long[] remoteBacklogSize = new long[1];
+        Awaitility.await().untilAsserted(() -> {
+            remoteBacklogSize[0] = 
admin2.topics().getStats(dest.toString()).getBacklogSize();
+            assertTrue(remoteBacklogSize[0] > 1);
+        });
         var attributes = Attributes.of(
                 OpenTelemetryAttributes.PULSAR_DOMAIN, 
dest.getDomain().value(),
                 OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
@@ -1037,13 +1046,21 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
         assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
 
-        // Restrict backlog quota limit to 1 byte to stop replication
+        // Restrict the backlog quota to just below the current r2 backlog. 
This stops replication
+        // now, but leaves enough room for one small test message after the 
existing backlog drains.
         admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder()
-                .limitSize(1)
+                .limitSize(remoteBacklogSize[0] - 1)
                 .retentionPolicy(policy)
                 .build());
 
-        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+        Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 6L, 
TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    admin2.brokers().backlogQuotaCheck();
+                    assertTrue(remoteTopic.isSizeBacklogExceeded());
+                    assertFalse(replicator.isConnected());
+                });
 
         // Next message will not be replicated, because r2 has reached the 
quota
         producer1.produce(1);
@@ -1057,15 +1074,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
                 aDouble -> assertThat(aDouble).isGreaterThanOrEqualTo(0.0));
 
-        // Consumer will now drain 1 message and the replication backlog will 
be cleared
-        consumer2.receive(1);
+        // Consumer will now drain the existing messages and the remote 
backlog quota will be relaxed.
+        consumer2.receive(remoteBacklogMessages);
 
-        // Wait until the 2nd message got delivered to consumer.
-        // Use a longer timeout because the replicator needs time to detect 
that the backlog quota
-        // has been relaxed (checked every TIME_TO_CHECK_BACKLOG_QUOTA 
seconds) and resume replication.
-        consumer2.receive(1, TIME_TO_CHECK_BACKLOG_QUOTA * 4);
+        // Wait until the remote topic is actually below the size quota before 
nudging the replicator.
+        // Otherwise the next producer-create attempt can race with stale 
backlog accounting and consume
+        // another reconnect backoff window.
+        Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 6L, 
TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    admin2.brokers().backlogQuotaCheck();
+                    assertFalse(remoteTopic.isSizeBacklogExceeded());
+                });
+
+        // The replicator might already be waiting in an exponential reconnect 
backoff after the quota
+        // rejection. startProducer() is idempotent and lets the test observe 
recovery instead of waiting
+        // for the next scheduled retry. Once the backlog is zero, the second 
message has been persisted
+        // on r2 and can be consumed normally.
+        Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 8L, 
TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    replicator.startProducer();
+                    assertEquals(replicator.computeStats().replicationBacklog, 
0);
+                });
+        consumer2.receive(1);
 
-        Awaitility.await().untilAsserted(() -> 
assertEquals(replicator.computeStats().replicationBacklog, 0));
         metrics = metricReader1.collectAllMetrics();
         assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
         assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);

Reply via email to