This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 086906d Fix collection get bug in ResourceGroupService (#12499)
086906d is described below
commit 086906d08620eaa2e267fb39abcfe73793cda544
Author: ZhangJian He <[email protected]>
AuthorDate: Thu Nov 4 22:57:50 2021 +0800
Fix collection get bug in ResourceGroupService (#12499)
---
.../broker/resourcegroup/ResourceGroupService.java | 3 +-
.../RGUsageMTAggrWaitForAllMsgsTest.java | 41 +++++++++++-----------
.../ResourceGroupRateLimiterTest.java | 1 -
.../resourcegroup/ResourceGroupServiceTest.java | 4 +--
4 files changed, 22 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 677c04a..34826f8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -323,8 +323,7 @@ public class ResourceGroupService {
protected boolean incrementUsage(String tenantName, String nsName,
ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount incStats) throws
PulsarAdminException {
- final String tenantAndNsString = tenantName + "/" + nsName;
- final ResourceGroup nsRG =
this.namespaceToRGsMap.get(tenantAndNsString);
+ final ResourceGroup nsRG =
this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName));
final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName);
if (tenantRG == null && nsRG == null) {
return false;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 1ea83a0..d3874fa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -517,11 +517,18 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive);
Assert.assertEquals(numConsumerExceptions, 0);
+ boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
+ // If the tenant and NS are on different RGs, the bytes/messages get
counted once on the
+ // tenant RG, and again on the namespace RG. This double-counting is
avoided if tenant-RG == NS-RG.
+ // This is a known (and discussed) artifact in the implementation.
+ // 'ScaleFactor' is a way to incorporate that effect in the
verification.
+ final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;
+
// Verify producer and consumer side stats.
- this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs,
recvdNumBytes, recvdNumMsgs, true, true);
+ this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs,
recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);
// Verify the metrics corresponding to the operations in this test.
- this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs,
recvdNumBytes, recvdNumMsgs, true, true);
+ this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes,
recvdNumMsgs, scaleFactor, true, true);
unRegisterTenantsAndNamespaces(topicStrings);
// destroyTopics can be called after createTopics() is added back
@@ -535,9 +542,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
private void verifyRGProdConsStats(String[] topicStrings,
int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
- boolean checkProduce, boolean
checkConsume) throws Exception {
+ int scaleFactor, boolean checkProduce,
+ boolean checkConsume) throws Exception {
- boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
BrokerService bs = pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
@@ -557,12 +564,6 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
BytesAndMessagesCount totalNsRGConsCounts = new
BytesAndMessagesCount();
BytesAndMessagesCount prodCounts, consCounts;
- // If the tenant and NS are on different RGs, the bytes/messages get
counted once on the
- // tenant RG, and again on the namespace RG. This double-counting is
avoided if tenant-RG == NS-RG.
- // This is a known (and discussed) artifact in the implementation.
- // 'ScaleFactor' is a way to incorporate that effect in the
verification.
- final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;
-
// Since the following walk is on topics, keep track of the RGs for
which we have already gathered stats,
// so that we do not double-accumulate stats if multiple topics refer
to the same RG.
HashSet<String> RGsWithPublishStatsGathered = new HashSet<>();
@@ -643,24 +644,22 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
if (checkProduce) {
prodCounts =
ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts);
- Assert.assertEquals(prodCounts.messages, sentNumMsgs);
+ Assert.assertEquals(prodCounts.messages, sentNumMsgs *
scaleFactor);
Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent);
}
if (checkConsume) {
consCounts =
ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts);
- Assert.assertEquals(consCounts.messages, recvdNumMsgs);
+ Assert.assertEquals(consCounts.messages, recvdNumMsgs *
scaleFactor);
Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived);
}
}
// Check the metrics for the RGs involved
- private void verifyRGMetrics(String[] topicStrings,
- int sentNumBytes, int sentNumMsgs,
+ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
- boolean checkProduce, boolean checkConsume)
throws Exception {
-
- tenantRGEqualsNamespaceRG(topicStrings);
+ int scaleFactor, boolean checkProduce,
+ boolean checkConsume) throws Exception {
final int ExpectedNumBytesSent = sentNumBytes +
PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
final int ExpectedNumBytesReceived = recvdNumBytes +
PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
long totalTenantRegisters = 0;
@@ -729,12 +728,12 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
// So, we take the residuals into account when comparing against
the expected.
if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) {
Assert.assertEquals(totalUsedMessages[mcIdx] -
residualSentNumMessages,
- sentNumMsgs);
+ sentNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes
>= ExpectedNumBytesSent);
} else if (checkConsume && mc ==
ResourceGroupMonitoringClass.Dispatch) {
Assert.assertEquals(totalUsedMessages[mcIdx] -
residualRecvdNumMessages,
- recvdNumMsgs);
+ recvdNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes
>= ExpectedNumBytesReceived);
}
@@ -745,9 +744,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
// Update the residuals for next round of tests.
residualSentNumBytes += sentNumBytes;
- residualSentNumMessages += sentNumMsgs;
+ residualSentNumMessages += sentNumMsgs * scaleFactor;
residualRecvdNumBytes += recvdNumBytes;
- residualRecvdNumMessages += recvdNumMsgs;
+ residualRecvdNumMessages += recvdNumMsgs * scaleFactor;
Assert.assertEquals(totalUpdates, 0); // currently, we don't update
the RGs in this UT
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
index a7b42d0..395f086 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
@@ -45,7 +45,6 @@ public class ResourceGroupRateLimiterTest extends
BrokerTestBase {
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String namespaceName = "prop/ns-abc";
final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
- final String nonPersistentTopicString =
"non-persistent://prop/ns-abc/test-topic";
final int MESSAGE_SIZE = 10;
@BeforeClass
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index a0350b0..4b84760 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -131,10 +131,9 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName);
// The overhead of a RG lookup
- ResourceGroup retRG;
mSecsStart = System.currentTimeMillis();
for (int ix = 0; ix < numPerfTestIterations; ix++) {
- retRG = rgs.resourceGroupGet(rg.resourceGroupName);
+ rgs.resourceGroupGet(rg.resourceGroupName);
}
mSecsEnd = System.currentTimeMillis();
diffMsecs = mSecsEnd - mSecsStart;
@@ -221,7 +220,6 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
// maxUsageReportSuppressRounds iterations. So, if we run for
maxUsageReportSuppressRounds iterations,
// we should see needToReportLocalUsage() return true at least
once.
Set<Boolean> myBoolSet = new HashSet<>();
- myBoolSet.clear();
for (int idx = 0; idx <
ResourceGroupService.MaxUsageReportSuppressRounds; idx++) {
needToReport = retRG.setUsageInMonitoredEntity(monClass,
nwUsage);
myBoolSet.add(needToReport);