This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3f9824ac655a8e146013b999e31840bb35378274 Author: Lari Hotari <[email protected]> AuthorDate: Tue Oct 28 19:10:36 2025 +0200 [fix][test] Fix flaky ReplicatorTest.testResumptionAfterBacklogRelaxed (#24904) (cherry picked from commit 3f80cd5f6587df457dfa83aca437abb659284c0a) --- .../pulsar/broker/service/ReplicatorTest.java | 101 ++++++++++----------- 1 file changed, 47 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 5820ef3d8e4..a5b7b4f3e6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -99,6 +99,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicStats; @@ -911,6 +912,10 @@ public class ReplicatorTest extends ReplicatorTestBase { assertTrue(remoteClusters.contains("r1")); } + @DataProvider(name = "retentionPolicies") + public static Object[][] retentionPolicies() { + return new Object[][] { { RetentionPolicy.producer_exception }, { RetentionPolicy.producer_request_hold } }; + } /** * Issue #199 @@ -921,73 +926,61 @@ public class ReplicatorTest extends ReplicatorTestBase { * @throws Exception */ - @Test(timeOut = 60000, priority = -1) - public void testResumptionAfterBacklogRelaxed() throws Exception { - List<RetentionPolicy> policies = new ArrayList<>(); - policies.add(RetentionPolicy.producer_exception); - policies.add(RetentionPolicy.producer_request_hold); - - for (RetentionPolicy policy : policies) { - // Use 1Mb quota by default - admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder() - .limitSize(1 * 1024 * 1024) - .retentionPolicy(policy) - .build()); - Thread.sleep(200); - - TopicName dest = TopicName - .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy)); - - // Producer on r1 - @Cleanup - MessageProducer producer1 = new MessageProducer(url1, dest); + @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies") + public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy) throws Exception { + // create a unique namespace for this test case to avoid flakiness + String namespace = newUniqueName("pulsar/testResumptionAfterBacklogRelaxed"); + Policies policies = new Policies(); + policies.backlog_quota_map = Map.of(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder() + // Use 1Mb quota by default + .limitSize(1 * 1024 * 1024) + .retentionPolicy(policy) + .build()); + policies.replication_clusters = Set.of("r1", "r2"); + admin1.namespaces().createNamespace(namespace, policies); + + TopicName dest = TopicName.get("persistent://" + namespace + "/" + policy); - // Consumer on r2 - @Cleanup - MessageConsumer consumer2 = new MessageConsumer(url2, dest); - - // Replicator for r1 -> r2 - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - Replicator replicator = topic.getPersistentReplicator("r2"); + // Producer on r1 + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); - // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of - // local backlog - producer1.produce(1); + // Consumer on r2 + @Cleanup + MessageConsumer consumer2 = new MessageConsumer(url2, dest); - Thread.sleep(500); + // Replicator for r1 -> r2 + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() + .getTopicReference(dest.toString()).get(); + Replicator replicator = topic.getPersistentReplicator("r2"); - // Restrict backlog quota limit to 1 byte to stop replication - admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder() - .limitSize(1) - .retentionPolicy(policy) - .build()); + // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of + // local backlog + producer1.produce(1); - Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + Awaitility.await().untilAsserted(() -> assertEquals(replicator.getStats().replicationBacklog, 0)); - assertEquals(replicator.getStats().replicationBacklog, 0); + // Restrict backlog quota limit to 1 byte to stop replication + admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder() + .limitSize(1) + .retentionPolicy(policy) + .build()); - // Next message will not be replicated, because r2 has reached the quota - producer1.produce(1); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); - Thread.sleep(500); - assertEquals(replicator.getStats().replicationBacklog, 1); + // Next message will not be replicated, because r2 has reached the quota + producer1.produce(1); - // Consumer will now drain 1 message and the replication backlog will be cleared - consumer2.receive(1); + Awaitility.await().untilAsserted(() -> assertEquals(replicator.getStats().replicationBacklog, 1)); - // Wait until the 2nd message got delivered to consumer - consumer2.receive(1); + // Consumer will now drain 1 message and the replication backlog will be cleared + consumer2.receive(1); - int retry = 10; - for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) { - if (i != retry - 1) { - Thread.sleep(100); - } - } + // Wait until the 2nd message got delivered to consumer + consumer2.receive(1); - assertEquals(replicator.getStats().replicationBacklog, 0); - } + Awaitility.await().untilAsserted(() -> assertEquals(replicator.getStats().replicationBacklog, 0)); } /**
