This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3d7c767ca6 [fix][test] Fix flaky test
ReplicatorTest.testResumptionAfterBacklogRelaxed (#25950)
f3d7c767ca6 is described below
commit f3d7c767ca6ca1f2d5017da615225ea7f7528def
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Jun 8 23:11:08 2026 +0800
[fix][test] Fix flaky test ReplicatorTest.testResumptionAfterBacklogRelaxed
(#25950)
Co-authored-by: Lari Hotari <[email protected]>
---
.../pulsar/broker/service/ReplicatorTest.java | 62 +++++++++++++++++-----
1 file changed, 48 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9753e68dfb1..10d616a2b0f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -993,7 +994,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
* @throws Exception
*/
- @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies")
+ @Test(timeOut = 120000, priority = -1, dataProvider = "retentionPolicies")
public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy)
throws Exception {
// create a unique namespace for this test case to avoid flakiness
String namespace =
newUniqueName("pulsar/testResumptionAfterBacklogRelaxed");
@@ -1020,12 +1021,20 @@ public class ReplicatorTest extends ReplicatorTestBase {
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
.getTopicReference(dest.toString()).get();
Replicator replicator = topic.getPersistentReplicator("r2");
+ PersistentTopic remoteTopic = (PersistentTopic)
pulsar2.getBrokerService()
+ .getTopicReference(dest.toString()).get();
- // Produce 1 message in r1. This message will be replicated
immediately into r2 and it will become part of
- // local backlog
- producer1.produce(1);
+ // Produce messages in r1. These messages will be replicated
immediately into r2 and become
+ // part of the local backlog there.
+ int remoteBacklogMessages = 2;
+ producer1.produce(remoteBacklogMessages);
Awaitility.await().untilAsserted(() ->
assertEquals(replicator.computeStats().replicationBacklog, 0));
+ long[] remoteBacklogSize = new long[1];
+ Awaitility.await().untilAsserted(() -> {
+ remoteBacklogSize[0] =
admin2.topics().getStats(dest.toString()).getBacklogSize();
+ assertTrue(remoteBacklogSize[0] > 1);
+ });
var attributes = Attributes.of(
OpenTelemetryAttributes.PULSAR_DOMAIN,
dest.getDomain().value(),
OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
@@ -1037,13 +1046,21 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
- // Restrict backlog quota limit to 1 byte to stop replication
+ // Restrict the backlog quota to just below the current r2 backlog.
This stops replication
+ // now, but leaves enough room for one small test message after the
existing backlog drains.
admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder()
- .limitSize(1)
+ .limitSize(remoteBacklogSize[0] - 1)
.retentionPolicy(policy)
.build());
- Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+ Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 6L,
TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ admin2.brokers().backlogQuotaCheck();
+ assertTrue(remoteTopic.isSizeBacklogExceeded());
+ assertFalse(replicator.isConnected());
+ });
// Next message will not be replicated, because r2 has reached the
quota
producer1.produce(1);
@@ -1057,15 +1074,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
aDouble -> assertThat(aDouble).isGreaterThanOrEqualTo(0.0));
- // Consumer will now drain 1 message and the replication backlog will
be cleared
- consumer2.receive(1);
+ // Consumer will now drain the existing messages and the remote
backlog quota will be relaxed.
+ consumer2.receive(remoteBacklogMessages);
- // Wait until the 2nd message got delivered to consumer.
- // Use a longer timeout because the replicator needs time to detect
that the backlog quota
- // has been relaxed (checked every TIME_TO_CHECK_BACKLOG_QUOTA
seconds) and resume replication.
- consumer2.receive(1, TIME_TO_CHECK_BACKLOG_QUOTA * 4);
+ // Wait until the remote topic is actually below the size quota before
nudging the replicator.
+ // Otherwise the next producer-create attempt can race with stale
backlog accounting and consume
+ // another reconnect backoff window.
+ Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 6L,
TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ admin2.brokers().backlogQuotaCheck();
+ assertFalse(remoteTopic.isSizeBacklogExceeded());
+ });
+
+ // The replicator might already be waiting in an exponential reconnect
backoff after the quota
+ // rejection. startProducer() is idempotent and lets the test observe
recovery instead of waiting
+ // for the next scheduled retry. Once the backlog is zero, the second
message has been persisted
+ // on r2 and can be consumed normally.
+ Awaitility.await().atMost(TIME_TO_CHECK_BACKLOG_QUOTA * 8L,
TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ replicator.startProducer();
+ assertEquals(replicator.computeStats().replicationBacklog,
0);
+ });
+ consumer2.receive(1);
- Awaitility.await().untilAsserted(() ->
assertEquals(replicator.computeStats().replicationBacklog, 0));
metrics = metricReader1.collectAllMetrics();
assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);