This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e3b0f44c7bd [improve][test] Reduce the time consumption of
BacklogQuotaManagerTest (#16550)
e3b0f44c7bd is described below
commit e3b0f44c7bd08343d8dc786962d91d3449736923
Author: lipenghui <[email protected]>
AuthorDate: Wed Jul 13 08:03:48 2022 +0800
[improve][test] Reduce the time consumption of BacklogQuotaManagerTest
(#16550)
---
.../broker/service/BacklogQuotaManagerTest.java | 61 +++++++++++++++-------
1 file changed, 41 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index e046966025d..f51dcf42c5d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -59,7 +60,9 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -74,7 +77,7 @@ public class BacklogQuotaManagerTest {
LocalBookkeeperEnsemble bkEnsemble;
- private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3;
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2;
private static final int MAX_ENTRIES_PER_LEDGER = 5;
@DataProvider(name = "backlogQuotaSizeGB")
@@ -82,7 +85,7 @@ public class BacklogQuotaManagerTest {
return new Object[][] { { true }, { false } };
}
- @BeforeMethod
+ @BeforeClass
void setup() throws Exception {
try {
// start local bookie and zookeeper
@@ -106,6 +109,7 @@ public class BacklogQuotaManagerTest {
config.setAllowAutoTopicCreationType("non-partitioned");
config.setSystemTopicEnabled(false);
config.setTopicLevelPoliciesEnabled(false);
+ config.setForceDeleteNamespaceAllowed(true);
pulsar = new PulsarService(config);
pulsar.start();
@@ -116,19 +120,13 @@ public class BacklogQuotaManagerTest {
admin.clusters().createCluster("usc",
ClusterData.builder().serviceUrl(adminUrl.toString()).build());
admin.tenants().createTenant("prop",
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("usc")));
- admin.namespaces().createNamespace("prop/ns-quota");
-
admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota",
Sets.newHashSet("usc"));
- admin.namespaces().createNamespace("prop/quotahold");
-
admin.namespaces().setNamespaceReplicationClusters("prop/quotahold",
Sets.newHashSet("usc"));
- admin.namespaces().createNamespace("prop/quotaholdasync");
-
admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync",
Sets.newHashSet("usc"));
} catch (Throwable t) {
LOG.error("Error setting up broker test", t);
fail("Broker test setup failed");
}
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
void shutdown() throws Exception {
try {
if (admin != null) {
@@ -149,6 +147,24 @@ public class BacklogQuotaManagerTest {
}
}
+ @BeforeMethod(alwaysRun = true)
+ void createNamespaces() throws PulsarAdminException {
+ config.setPreciseTimeBasedBacklogQuotaCheck(false);
+ admin.namespaces().createNamespace("prop/ns-quota");
+ admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota",
Sets.newHashSet("usc"));
+ admin.namespaces().createNamespace("prop/quotahold");
+ admin.namespaces().setNamespaceReplicationClusters("prop/quotahold",
Sets.newHashSet("usc"));
+ admin.namespaces().createNamespace("prop/quotaholdasync");
+
admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync",
Sets.newHashSet("usc"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void clearNamespaces() throws PulsarAdminException {
+ admin.namespaces().deleteNamespace("prop/ns-quota", true);
+ admin.namespaces().deleteNamespace("prop/quotahold", true);
+ admin.namespaces().deleteNamespace("prop/quotaholdasync", true);
+ }
+
private void rolloverStats() {
pulsar.getBrokerService().updateRates();
}
@@ -195,11 +211,12 @@ public class BacklogQuotaManagerTest {
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
+ MessageIdImpl msgId = null;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
- MessageId msgId = producer.send(content);
+ msgId = (MessageIdImpl) producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
@@ -209,6 +226,7 @@ public class BacklogQuotaManagerTest {
// However, trimming of ledgers are piggy packed onto ledger
operations.
// So if there isn't new data coming in, trimming never occurs.
// We need to trigger trimming on a schedule to actually delete
all remaining ledgers
+ MessageIdImpl finalMsgId = msgId;
Awaitility.await().untilAsserted(() -> {
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats =
@@ -218,7 +236,7 @@ public class BacklogQuotaManagerTest {
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given
MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, (2 *
numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
+ assertEquals(internalStats.ledgers.get(0).ledgerId,
finalMsgId.getLedgerId());
});
// check reader can still read with out error
@@ -269,17 +287,19 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog =
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
- "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]"); ;
+ "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
+ MessageIdImpl messageId = null;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
- producer.send(content);
+ messageId = (MessageIdImpl) producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
}
+ MessageIdImpl finalMessageId = messageId;
Awaitility.await().untilAsserted(() -> {
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic1, false);
@@ -288,7 +308,7 @@ public class BacklogQuotaManagerTest {
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given
MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, (2 *
numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
+ assertEquals(internalStats.ledgers.get(0).ledgerId,
finalMessageId.getLedgerId());
});
// check reader can still read with out error
@@ -650,11 +670,6 @@ public class BacklogQuotaManagerTest {
public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception
{
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Maps.newHashMap());
- admin.namespaces().setBacklogQuota("prop/ns-quota",
- BacklogQuota.builder()
- .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
-
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
- .build(), BacklogQuota.BacklogQuotaType.message_age);
@Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
@@ -702,6 +717,12 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);
});
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build(), BacklogQuota.BacklogQuotaType.message_age);
+
Awaitility.await()
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(4 * TIME_TO_CHECK_BACKLOG_QUOTA))
@@ -1267,7 +1288,7 @@ public class BacklogQuotaManagerTest {
client.close();
}
- @Test(dataProvider = "backlogQuotaSizeGB")
+ @Test(dataProvider = "backlogQuotaSizeGB", priority = 1)
public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws
Exception {
pulsar.close();