This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fac14fbb759 [fix][test] Fix flaky test
ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
fac14fbb759 is described below
commit fac14fbb75982cc229ded2128af6c9d5844f1ab8
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 16 15:51:40 2022 +0800
[fix][test] Fix flaky test ResourceGroupUsageAggregationTest.
testProduceConsumeUsageOnRG (#17617)
---
.../broker/resourcegroup/ResourceGroupService.java | 11 +++
.../ResourceGroupUsageAggregationTest.java | 105 ++++++++++++---------
2 files changed, 70 insertions(+), 46 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 79cba28d374..4bb1bc8ab24 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resourcegroup;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
@@ -829,4 +830,14 @@ public class ResourceGroupService {
.name("pulsar_resource_group_calculate_quota_secs")
.help("Time required to calculate quota of all resource groups, in
seconds.")
.register();
+
+ @VisibleForTesting
+ ConcurrentHashMap getTopicConsumeStats() {
+ return this.topicConsumeStats;
+ }
+
+ @VisibleForTesting
+ ConcurrentHashMap getTopicProduceStats() {
+ return this.topicProduceStats;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index a86035b71e0..1fd9271d9bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -24,6 +24,8 @@ import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCoun
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -37,16 +39,16 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
-import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
-@Test(groups = "flaky")
public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
@BeforeClass
@Override
@@ -120,10 +122,11 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
.create();
Consumer<byte[]> consumer = null;
+ String subscriptionName = "my-subscription";
try {
consumer = pulsarClient.newConsumer()
.topic(topicString)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
} catch (PulsarClientException p) {
@@ -176,6 +179,20 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
true, true);
consumer.close();
+ // cleanup the topic data.
+ CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopics().remove(topicString);
+ if (topicFuture != null) {
+ Optional<Topic> optTopic = topicFuture.join();
+ if (optTopic.isPresent()) {
+ Topic topic = optTopic.get();
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+
persistentTopic.getSubscription(subscriptionName).deleteForcefully();
+ }
+ }
+ }
+ rgs.getTopicConsumeStats().clear();
+ rgs.getTopicProduceStats().clear();
rgs.unRegisterTenant(activeRgName, tenantString);
rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString));
@@ -192,48 +209,44 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
boolean checkProduce, boolean checkConsume)
throws
InterruptedException, PulsarAdminException {
BrokerService bs = pulsar.getBrokerService();
- Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
- for (Map.Entry<String, TopicStatsImpl> entry :
topicStatsMap.entrySet()) {
- String mapTopicName = entry.getKey();
- if (mapTopicName.equals(topicString)) {
- TopicStatsImpl stats = entry.getValue();
- if (checkProduce) {
- Assert.assertTrue(stats.bytesInCounter >= sentNumBytes);
- Assert.assertEquals(sentNumMsgs, stats.msgInCounter);
- }
- if (checkConsume) {
- Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes);
- Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter);
- }
-
- if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
- rgs.aggregateResourceGroupLocalUsages(); // hack to
ensure aggregator calculation without waiting
- BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Publish,
- ResourceGroupUsageStatsType.Cumulative);
- BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Dispatch,
- ResourceGroupUsageStatsType.Cumulative);
-
- // Re-do the getRGUsage.
- // The counts should be equal, since there wasn't any
intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
- BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Publish,
- ResourceGroupUsageStatsType.Cumulative);
- BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Dispatch,
- ResourceGroupUsageStatsType.Cumulative);
-
- Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
- Assert.assertEquals(prodCounts1.messages,
prodCounts.messages);
- Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
- Assert.assertEquals(consCounts1.messages,
consCounts.messages);
-
- if (checkProduce) {
- Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
- Assert.assertEquals(sentNumMsgs, prodCounts.messages);
- }
- if (checkConsume) {
- Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
- Assert.assertEquals(recvdNumMsgs, consCounts.messages);
- }
- }
+ Awaitility.await().untilAsserted(() -> {
+ TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
+ Assert.assertNotNull(topicStats);
+ if (checkProduce) {
+ Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes);
+ Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter);
+ }
+ if (checkConsume) {
+ Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes);
+ Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter);
+ }
+ });
+ if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
+ rgs.aggregateResourceGroupLocalUsages(); // hack to ensure
aggregator calculation without waiting
+ BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Publish,
+ ResourceGroupUsageStatsType.Cumulative);
+ BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Dispatch,
+ ResourceGroupUsageStatsType.Cumulative);
+
+ // Re-do the getRGUsage.
+ // The counts should be equal, since there wasn't any intervening
traffic on TEST_PRODUCE_CONSUME_TOPIC.
+ BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Publish,
+ ResourceGroupUsageStatsType.Cumulative);
+ BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName,
ResourceGroupMonitoringClass.Dispatch,
+ ResourceGroupUsageStatsType.Cumulative);
+
+ Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
+ Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
+ Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
+ Assert.assertEquals(consCounts1.messages, consCounts.messages);
+
+ if (checkProduce) {
+ Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
+ Assert.assertEquals(sentNumMsgs, prodCounts.messages);
+ }
+ if (checkConsume) {
+ Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
+ Assert.assertEquals(recvdNumMsgs, consCounts.messages);
}
}
}