This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 086906d  Fix collection get bug in ResourceGroupService (#12499)
086906d is described below

commit 086906d08620eaa2e267fb39abcfe73793cda544
Author: ZhangJian He <[email protected]>
AuthorDate: Thu Nov 4 22:57:50 2021 +0800

    Fix collection get bug in ResourceGroupService (#12499)
---
 .../broker/resourcegroup/ResourceGroupService.java |  3 +-
 .../RGUsageMTAggrWaitForAllMsgsTest.java           | 41 +++++++++++-----------
 .../ResourceGroupRateLimiterTest.java              |  1 -
 .../resourcegroup/ResourceGroupServiceTest.java    |  4 +--
 4 files changed, 22 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 677c04a..34826f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -323,8 +323,7 @@ public class ResourceGroupService {
     protected boolean incrementUsage(String tenantName, String nsName,
                                   ResourceGroupMonitoringClass monClass,
                                   BytesAndMessagesCount incStats) throws 
PulsarAdminException {
-        final String tenantAndNsString = tenantName + "/" + nsName;
-        final ResourceGroup nsRG = 
this.namespaceToRGsMap.get(tenantAndNsString);
+        final ResourceGroup nsRG = 
this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName));
         final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName);
         if (tenantRG == null && nsRG == null) {
             return false;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 1ea83a0..d3874fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -517,11 +517,18 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive);
         Assert.assertEquals(numConsumerExceptions, 0);
 
+        boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
+        // If the tenant and NS are on different RGs, the bytes/messages get 
counted once on the
+        // tenant RG, and again on the namespace RG. This double-counting is 
avoided if tenant-RG == NS-RG.
+        // This is a known (and discussed) artifact in the implementation.
+        // 'ScaleFactor' is a way to incorporate that effect in the 
verification.
+        final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;
+
         // Verify producer and consumer side stats.
-        this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, 
recvdNumBytes, recvdNumMsgs, true, true);
+        this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, 
recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);
 
         // Verify the metrics corresponding to the operations in this test.
-        this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, 
recvdNumBytes, recvdNumMsgs, true, true);
+        this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes, 
recvdNumMsgs, scaleFactor, true, true);
 
         unRegisterTenantsAndNamespaces(topicStrings);
         // destroyTopics can be called after createTopics() is added back
@@ -535,9 +542,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
     private void verifyRGProdConsStats(String[] topicStrings,
                                        int sentNumBytes, int sentNumMsgs,
                                        int recvdNumBytes, int recvdNumMsgs,
-                                       boolean checkProduce, boolean 
checkConsume) throws Exception {
+                                       int scaleFactor, boolean checkProduce,
+                                       boolean checkConsume) throws Exception {
 
-        boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
         BrokerService bs = pulsar.getBrokerService();
         Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
 
@@ -557,12 +564,6 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         BytesAndMessagesCount totalNsRGConsCounts = new 
BytesAndMessagesCount();
         BytesAndMessagesCount prodCounts, consCounts;
 
-        // If the tenant and NS are on different RGs, the bytes/messages get 
counted once on the
-        // tenant RG, and again on the namespace RG. This double-counting is 
avoided if tenant-RG == NS-RG.
-        // This is a known (and discussed) artifact in the implementation.
-        // 'ScaleFactor' is a way to incorporate that effect in the 
verification.
-        final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;
-
         // Since the following walk is on topics, keep track of the RGs for 
which we have already gathered stats,
         // so that we do not double-accumulate stats if multiple topics refer 
to the same RG.
         HashSet<String> RGsWithPublishStatsGathered = new HashSet<>();
@@ -643,24 +644,22 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 
         if (checkProduce) {
             prodCounts = 
ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts);
-            Assert.assertEquals(prodCounts.messages, sentNumMsgs);
+            Assert.assertEquals(prodCounts.messages, sentNumMsgs * 
scaleFactor);
             Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent);
         }
 
         if (checkConsume) {
             consCounts = 
ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts);
-            Assert.assertEquals(consCounts.messages, recvdNumMsgs);
+            Assert.assertEquals(consCounts.messages, recvdNumMsgs * 
scaleFactor);
             Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived);
         }
     }
 
     // Check the metrics for the RGs involved
-    private void verifyRGMetrics(String[] topicStrings,
-                                 int sentNumBytes, int sentNumMsgs,
+    private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
                                  int recvdNumBytes, int recvdNumMsgs,
-                                 boolean checkProduce, boolean checkConsume) 
throws Exception {
-
-        tenantRGEqualsNamespaceRG(topicStrings);
+                                 int scaleFactor, boolean checkProduce,
+                                 boolean checkConsume) throws Exception {
         final int ExpectedNumBytesSent = sentNumBytes + 
PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
         final int ExpectedNumBytesReceived = recvdNumBytes + 
PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
         long totalTenantRegisters = 0;
@@ -729,12 +728,12 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
             // So, we take the residuals into account when comparing against 
the expected.
             if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) {
                 Assert.assertEquals(totalUsedMessages[mcIdx] - 
residualSentNumMessages,
-                        sentNumMsgs);
+                        sentNumMsgs * scaleFactor);
                 Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes
                         >= ExpectedNumBytesSent);
             } else if (checkConsume && mc == 
ResourceGroupMonitoringClass.Dispatch) {
                 Assert.assertEquals(totalUsedMessages[mcIdx] - 
residualRecvdNumMessages,
-                        recvdNumMsgs);
+                        recvdNumMsgs * scaleFactor);
                 Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes
                         >= ExpectedNumBytesReceived);
             }
@@ -745,9 +744,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 
         // Update the residuals for next round of tests.
         residualSentNumBytes += sentNumBytes;
-        residualSentNumMessages += sentNumMsgs;
+        residualSentNumMessages += sentNumMsgs * scaleFactor;
         residualRecvdNumBytes += recvdNumBytes;
-        residualRecvdNumMessages += recvdNumMsgs;
+        residualRecvdNumMessages += recvdNumMsgs * scaleFactor;
 
         Assert.assertEquals(totalUpdates, 0);  // currently, we don't update 
the RGs in this UT
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
index a7b42d0..395f086 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
@@ -45,7 +45,6 @@ public class ResourceGroupRateLimiterTest extends 
BrokerTestBase {
     new org.apache.pulsar.common.policies.data.ResourceGroup();
     final String namespaceName = "prop/ns-abc";
     final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
-    final String nonPersistentTopicString = 
"non-persistent://prop/ns-abc/test-topic";
     final int MESSAGE_SIZE = 10;
 
     @BeforeClass
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index a0350b0..4b84760 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -131,10 +131,9 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
         rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName);
 
         // The overhead of a RG lookup
-        ResourceGroup retRG;
         mSecsStart = System.currentTimeMillis();
         for (int ix = 0; ix < numPerfTestIterations; ix++) {
-            retRG = rgs.resourceGroupGet(rg.resourceGroupName);
+            rgs.resourceGroupGet(rg.resourceGroupName);
         }
         mSecsEnd = System.currentTimeMillis();
         diffMsecs = mSecsEnd - mSecsStart;
@@ -221,7 +220,6 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
             // maxUsageReportSuppressRounds iterations. So, if we run for 
maxUsageReportSuppressRounds iterations,
             // we should see needToReportLocalUsage() return true at least 
once.
             Set<Boolean> myBoolSet = new HashSet<>();
-            myBoolSet.clear();
             for (int idx = 0; idx < 
ResourceGroupService.MaxUsageReportSuppressRounds; idx++) {
                 needToReport = retRG.setUsageInMonitoredEntity(monClass, 
nwUsage);
                 myBoolSet.add(needToReport);

Reply via email to