This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new dcfe765ad0a [improve][test] Reduce the time consumption of
BacklogQuotaManagerTest (#16550)
dcfe765ad0a is described below
commit dcfe765ad0a76e3d0b55fd744e9c62ca062ea09c
Author: lipenghui <[email protected]>
AuthorDate: Wed Jul 13 08:03:48 2022 +0800
[improve][test] Reduce the time consumption of BacklogQuotaManagerTest
(#16550)
(cherry picked from commit e3b0f44c7bd08343d8dc786962d91d3449736923)
---
.../broker/service/BacklogQuotaManagerTest.java | 62 +++++++++++++++-------
1 file changed, 43 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index c824683f09f..68139b99dcb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -48,6 +49,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -58,7 +60,9 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -73,7 +77,7 @@ public class BacklogQuotaManagerTest {
LocalBookkeeperEnsemble bkEnsemble;
- private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3;
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2;
private static final int MAX_ENTRIES_PER_LEDGER = 5;
@DataProvider(name = "backlogQuotaSizeGB")
@@ -81,7 +85,7 @@ public class BacklogQuotaManagerTest {
return new Object[][] { { true }, { false } };
}
- @BeforeMethod
+ @BeforeClass
void setup() throws Exception {
try {
// start local bookie and zookeeper
@@ -102,6 +106,9 @@ public class BacklogQuotaManagerTest {
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType("non-partitioned");
+ config.setSystemTopicEnabled(false);
+ config.setTopicLevelPoliciesEnabled(false);
+ config.setForceDeleteNamespaceAllowed(true);
pulsar = new PulsarService(config);
pulsar.start();
@@ -112,19 +119,13 @@ public class BacklogQuotaManagerTest {
admin.clusters().createCluster("usc",
ClusterData.builder().serviceUrl(adminUrl.toString()).build());
admin.tenants().createTenant("prop",
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("usc")));
- admin.namespaces().createNamespace("prop/ns-quota");
-
admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota",
Sets.newHashSet("usc"));
- admin.namespaces().createNamespace("prop/quotahold");
-
admin.namespaces().setNamespaceReplicationClusters("prop/quotahold",
Sets.newHashSet("usc"));
- admin.namespaces().createNamespace("prop/quotaholdasync");
-
admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync",
Sets.newHashSet("usc"));
} catch (Throwable t) {
LOG.error("Error setting up broker test", t);
fail("Broker test setup failed");
}
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
void shutdown() throws Exception {
try {
if (admin != null) {
@@ -145,6 +146,24 @@ public class BacklogQuotaManagerTest {
}
}
+ @BeforeMethod(alwaysRun = true)
+ void createNamespaces() throws PulsarAdminException {
+ config.setPreciseTimeBasedBacklogQuotaCheck(false);
+ admin.namespaces().createNamespace("prop/ns-quota");
+ admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota",
Sets.newHashSet("usc"));
+ admin.namespaces().createNamespace("prop/quotahold");
+ admin.namespaces().setNamespaceReplicationClusters("prop/quotahold",
Sets.newHashSet("usc"));
+ admin.namespaces().createNamespace("prop/quotaholdasync");
+
admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync",
Sets.newHashSet("usc"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void clearNamespaces() throws PulsarAdminException {
+ admin.namespaces().deleteNamespace("prop/ns-quota", true);
+ admin.namespaces().deleteNamespace("prop/quotahold", true);
+ admin.namespaces().deleteNamespace("prop/quotaholdasync", true);
+ }
+
private void rolloverStats() {
pulsar.getBrokerService().updateRates();
}
@@ -193,11 +212,12 @@ public class BacklogQuotaManagerTest {
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
+ MessageIdImpl msgId = null;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
- MessageId msgId = producer.send(content);
+ msgId = (MessageIdImpl) producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
@@ -207,6 +227,7 @@ public class BacklogQuotaManagerTest {
// However, trimming of ledgers are piggy packed onto ledger
operations.
// So if there isn't new data coming in, trimming never occurs.
// We need to trigger trimming on a schedule to actually delete
all remaining ledgers
+ MessageIdImpl finalMsgId = msgId;
Awaitility.await().untilAsserted(() -> {
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats =
@@ -216,7 +237,7 @@ public class BacklogQuotaManagerTest {
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given
MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, (2 *
numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
+ assertEquals(internalStats.ledgers.get(0).ledgerId,
finalMsgId.getLedgerId());
});
// check reader can still read with out error
@@ -264,16 +285,18 @@ public class BacklogQuotaManagerTest {
long nonDurableSubscriptionBacklog =
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
+ MessageIdImpl messageId = null;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
- producer.send(content);
+ messageId = (MessageIdImpl) producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
}
+ MessageIdImpl finalMessageId = messageId;
Awaitility.await().untilAsserted(() -> {
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic1, false);
@@ -282,7 +305,7 @@ public class BacklogQuotaManagerTest {
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given
MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, (2 *
numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
+ assertEquals(internalStats.ledgers.get(0).ledgerId,
finalMessageId.getLedgerId());
});
// check reader can still read with out error
@@ -648,11 +671,6 @@ public class BacklogQuotaManagerTest {
public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception
{
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Maps.newHashMap());
- admin.namespaces().setBacklogQuota("prop/ns-quota",
- BacklogQuota.builder()
- .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
-
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
- .build(), BacklogQuota.BacklogQuotaType.message_age);
@Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
@@ -700,6 +718,12 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);
});
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build(), BacklogQuota.BacklogQuotaType.message_age);
+
Awaitility.await()
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(4 * TIME_TO_CHECK_BACKLOG_QUOTA))
@@ -1283,7 +1307,7 @@ public class BacklogQuotaManagerTest {
client.close();
}
- @Test(dataProvider = "backlogQuotaSizeGB")
+ @Test(dataProvider = "backlogQuotaSizeGB", priority = 1)
public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws
Exception {
pulsar.close();