This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 18aa9eac63f363a88a12b5b2b678231db4c1b78b Author: Lari Hotari <[email protected]> AuthorDate: Tue Oct 28 17:58:41 2025 +0200 [fix][test] Fix flaky ReplicatorTest.testResumptionAfterBacklogRelaxed (#24904) (cherry picked from commit 3f80cd5f6587df457dfa83aca437abb659284c0a) --- .../pulsar/broker/service/ReplicatorTest.java | 135 ++++++++++----------- 1 file changed, 64 insertions(+), 71 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index f14cba61e60..b0a7b5598c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -104,6 +104,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicStats; @@ -972,6 +973,10 @@ public class ReplicatorTest extends ReplicatorTestBase { assertTrue(remoteClusters.contains("r1")); } + @DataProvider(name = "retentionPolicies") + public static Object[][] retentionPolicies() { + return new Object[][] { { RetentionPolicy.producer_exception }, { RetentionPolicy.producer_request_hold } }; + } /** * Issue #199 @@ -982,91 +987,79 @@ public class ReplicatorTest extends ReplicatorTestBase { * @throws Exception */ - @Test(timeOut = 60000, priority = -1) - public void testResumptionAfterBacklogRelaxed() throws Exception { - List<RetentionPolicy> policies = new ArrayList<>(); - policies.add(RetentionPolicy.producer_exception); - policies.add(RetentionPolicy.producer_request_hold); - - for (RetentionPolicy policy : policies) { - // Use 1Mb quota by default - admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder() - .limitSize(1 * 1024 * 1024) - .retentionPolicy(policy) - .build()); - Thread.sleep(200); - - TopicName dest = TopicName - .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy)); - - // Producer on r1 - @Cleanup - MessageProducer producer1 = new MessageProducer(url1, dest); + @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies") + public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy) throws Exception { + // create a unique namespace for this test case to avoid flakiness + String namespace = newUniqueName("pulsar/testResumptionAfterBacklogRelaxed"); + Policies policies = new Policies(); + policies.backlog_quota_map = Map.of(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder() + // Use 1Mb quota by default + .limitSize(1 * 1024 * 1024) + .retentionPolicy(policy) + .build()); + policies.replication_clusters = Set.of("r1", "r2"); + admin1.namespaces().createNamespace(namespace, policies); + + TopicName dest = TopicName.get("persistent://" + namespace + "/" + policy); - // Consumer on r2 - @Cleanup - MessageConsumer consumer2 = new MessageConsumer(url2, dest); + // Producer on r1 + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); - // Replicator for r1 -> r2 - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() - .getTopicReference(dest.toString()).get(); - Replicator replicator = topic.getPersistentReplicator("r2"); + // Consumer on r2 + @Cleanup + MessageConsumer consumer2 = new MessageConsumer(url2, dest); - // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of - // local backlog - producer1.produce(1); + // Replicator for r1 -> r2 + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() + .getTopicReference(dest.toString()).get(); + Replicator replicator = topic.getPersistentReplicator("r2"); - Thread.sleep(500); + // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of + // local backlog + producer1.produce(1); - // Restrict backlog quota limit to 1 byte to stop replication - admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder() - .limitSize(1) - .retentionPolicy(policy) - .build()); + Awaitility.await().untilAsserted(() -> assertEquals(replicator.computeStats().replicationBacklog, 0)); + var attributes = Attributes.of( + OpenTelemetryAttributes.PULSAR_DOMAIN, dest.getDomain().value(), + OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(), + OpenTelemetryAttributes.PULSAR_NAMESPACE, dest.getNamespace(), + OpenTelemetryAttributes.PULSAR_TOPIC, dest.getPartitionedTopicName(), + OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2 + ); + var metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); - Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + // Restrict backlog quota limit to 1 byte to stop replication + admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder() + .limitSize(1) + .retentionPolicy(policy) + .build()); - assertEquals(replicator.computeStats().replicationBacklog, 0); - var attributes = Attributes.of( - OpenTelemetryAttributes.PULSAR_DOMAIN, dest.getDomain().value(), - OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(), - OpenTelemetryAttributes.PULSAR_NAMESPACE, dest.getNamespace(), - OpenTelemetryAttributes.PULSAR_TOPIC, dest.getPartitionedTopicName(), - OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2 - ); - var metrics = metricReader1.collectAllMetrics(); - assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); - assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); - // Next message will not be replicated, because r2 has reached the quota - producer1.produce(1); - Thread.sleep(500); + // Next message will not be replicated, because r2 has reached the quota + producer1.produce(1); - assertEquals(replicator.computeStats().replicationBacklog, 1); - metrics = metricReader1.collectAllMetrics(); - assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1); - assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, - aDouble -> assertThat(aDouble).isPositive()); + Awaitility.await().untilAsserted(() -> assertEquals(replicator.computeStats().replicationBacklog, 1)); - // Consumer will now drain 1 message and the replication backlog will be cleared - consumer2.receive(1); + metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, + aDouble -> assertThat(aDouble).isPositive()); - // Wait until the 2nd message got delivered to consumer - consumer2.receive(1); + // Consumer will now drain 1 message and the replication backlog will be cleared + consumer2.receive(1); - int retry = 10; - for (int i = 0; i < retry && replicator.computeStats().replicationBacklog > 0; i++) { - if (i != retry - 1) { - Thread.sleep(100); - } - } + // Wait until the 2nd message got delivered to consumer + consumer2.receive(1); - assertEquals(replicator.computeStats().replicationBacklog, 0); - metrics = metricReader1.collectAllMetrics(); - assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); - assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); - } + Awaitility.await().untilAsserted(() -> assertEquals(replicator.computeStats().replicationBacklog, 0)); + metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); } /**
