This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8eb11ef1aad8307be7bc72501ab24d9c86a9706a Author: lipenghui <[email protected]> AuthorDate: Wed Jul 13 08:03:48 2022 +0800 [improve][test] Reduce the time consumption of BacklogQuotaManagerTest (#16550) (cherry picked from commit e3b0f44c7bd08343d8dc786962d91d3449736923) --- .../broker/service/BacklogQuotaManagerTest.java | 61 +++++++++++++++------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 585c051c3f0..88f13b7324c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -59,7 +60,9 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -74,7 +77,7 @@ public class BacklogQuotaManagerTest { LocalBookkeeperEnsemble bkEnsemble; - private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3; + private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2; private static final int MAX_ENTRIES_PER_LEDGER = 5; @DataProvider(name = "backlogQuotaSizeGB") @@ -82,7 +85,7 @@ public class BacklogQuotaManagerTest { return new Object[][] { { true }, { false } }; } - @BeforeMethod + @BeforeClass void setup() throws Exception { try { // start local bookie and zookeeper @@ -103,6 +106,7 @@ public class BacklogQuotaManagerTest { config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); + config.setForceDeleteNamespaceAllowed(true); pulsar = new PulsarService(config); pulsar.start(); @@ -113,19 +117,13 @@ public class BacklogQuotaManagerTest { admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl(adminUrl.toString()).build()); admin.tenants().createTenant("prop", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("usc"))); - admin.namespaces().createNamespace("prop/ns-quota"); - admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc")); - admin.namespaces().createNamespace("prop/quotahold"); - admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc")); - admin.namespaces().createNamespace("prop/quotaholdasync"); - admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc")); } catch (Throwable t) { LOG.error("Error setting up broker test", t); fail("Broker test setup failed"); } } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) void shutdown() throws Exception { try { if (admin != null) { @@ -146,6 +144,24 @@ public class BacklogQuotaManagerTest { } } + @BeforeMethod(alwaysRun = true) + void createNamespaces() throws PulsarAdminException { + config.setPreciseTimeBasedBacklogQuotaCheck(false); + admin.namespaces().createNamespace("prop/ns-quota"); + admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc")); + admin.namespaces().createNamespace("prop/quotahold"); + admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc")); + admin.namespaces().createNamespace("prop/quotaholdasync"); + admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc")); + } + + @AfterMethod(alwaysRun = true) + void clearNamespaces() throws PulsarAdminException { + admin.namespaces().deleteNamespace("prop/ns-quota", true); + admin.namespaces().deleteNamespace("prop/quotahold", true); + admin.namespaces().deleteNamespace("prop/quotaholdasync", true); + } + private void rolloverStats() { pulsar.getBrokerService().updateRates(); } @@ -192,11 +208,12 @@ public class BacklogQuotaManagerTest { assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); + MessageIdImpl msgId = null; try { // try to send over backlog quota and make sure it fails for (int i = 0; i < numMsgs; i++) { content[0] = (byte) (content[0] + 1); - MessageId msgId = producer.send(content); + msgId = (MessageIdImpl) producer.send(content); } } catch (PulsarClientException ce) { fail("Should not have gotten exception: " + ce.getMessage()); @@ -206,6 +223,7 @@ public class BacklogQuotaManagerTest { // However, trimming of ledgers are piggy packed onto ledger operations. // So if there isn't new data coming in, trimming never occurs. // We need to trigger trimming on a schedule to actually delete all remaining ledgers + MessageIdImpl finalMsgId = msgId; Awaitility.await().untilAsserted(() -> { // make sure ledgers are trimmed PersistentTopicInternalStats internalStats = @@ -215,7 +233,7 @@ public class BacklogQuotaManagerTest { assertEquals(internalStats.ledgers.size(), 1); // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + assertEquals(internalStats.ledgers.get(0).ledgerId, finalMsgId.getLedgerId()); }); // check reader can still read with out error @@ -266,17 +284,19 @@ public class BacklogQuotaManagerTest { assertEquals(stats.getSubscriptions().size(), 1); long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, - "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ; + "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); + MessageIdImpl messageId = null; try { // try to send over backlog quota and make sure it fails for (int i = 0; i < numMsgs; i++) { content[0] = (byte) (content[0] + 1); - producer.send(content); + messageId = (MessageIdImpl) producer.send(content); } } catch (PulsarClientException ce) { fail("Should not have gotten exception: " + ce.getMessage()); } + MessageIdImpl finalMessageId = messageId; Awaitility.await().untilAsserted(() -> { // make sure ledgers are trimmed PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); @@ -285,7 +305,7 @@ public class BacklogQuotaManagerTest { assertEquals(internalStats.ledgers.size(), 1); // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId()); }); // check reader can still read with out error @@ -645,11 +665,6 @@ public class BacklogQuotaManagerTest { public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap()); - admin.namespaces().setBacklogQuota("prop/ns-quota", - BacklogQuota.builder() - .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA) - .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build(), BacklogQuota.BacklogQuotaType.message_age); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); @@ -697,6 +712,12 @@ public class BacklogQuotaManagerTest { assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); }); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await() .pollInterval(Duration.ofSeconds(1)) .atMost(Duration.ofSeconds(4 * TIME_TO_CHECK_BACKLOG_QUOTA)) @@ -1278,7 +1299,7 @@ public class BacklogQuotaManagerTest { client.close(); } - @Test(dataProvider = "backlogQuotaSizeGB") + @Test(dataProvider = "backlogQuotaSizeGB", priority = 1) public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception { pulsar.close();
