This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dd897b  Fix couple of issues in resource group rate limiter. (#11916)
3dd897b is described below

commit 3dd897b3c2d7c4e8f1881ecff923a67dcc7ba256
Author: Bharani Chadalavada <[email protected]>
AuthorDate: Thu Sep 9 10:05:21 2021 -0700

    Fix couple of issues in resource group rate limiter. (#11916)
    
    Co-authored-by: Bharani Chadalavada <[email protected]>
---
 .../apache/pulsar/broker/resourcegroup/ResourceGroupService.java    | 6 ++----
 .../pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java    | 5 +++--
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 846c7f05..a7dbf37 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -615,15 +615,13 @@ public class ResourceGroupService {
                     localUsageStats = 
resourceGroup.getLocalUsageStatsFromBrokerReports(monClass);
                     confCounts = resourceGroup.getConfLimits(monClass);
 
-                    val globUsageBytesArray = new long[1];
-                    globUsageBytesArray[0] = globalUsageStats.bytes;
+                    long[] globUsageBytesArray = new long[] { 
globalUsageStats.bytes };
                     updatedQuota.bytes = 
this.quotaCalculator.computeLocalQuota(
                             confCounts.bytes,
                             localUsageStats.bytes,
                             globUsageBytesArray);
 
-                    val globUsageMessagesArray = new long[1];
-                    globUsageMessagesArray[0] = globalUsageStats.messages;
+                    long[] globUsageMessagesArray = new long[] 
{globalUsageStats.messages };
                     updatedQuota.messages = 
this.quotaCalculator.computeLocalQuota(
                             confCounts.messages,
                             localUsageStats.messages,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
index 66d6cb6..ca83cae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
@@ -79,9 +79,10 @@ public class ResourceQuotaCalculatorImpl implements 
ResourceQuotaCalculator {
         // New quota is the old usage incremented by any residual as a ratio 
of the local usage to the total usage.
         // This should result in the calculatedQuota increasing 
proportionately if total usage is less than the
         // configured usage, and reducing proportionately if the total usage 
is greater than the configured usage.
-        // Capped to zero, to prevent negative setting of quota.
+        // Capped to 1, to prevent negative or zero setting of quota.
+        // the rate limiter code assumes that rate value of 0 or less to mean 
that no rate limit should be applied
         float myUsageFraction = (float) myUsage / totalUsage;
-        float calculatedQuota = max(myUsage + residual * myUsageFraction, 0);
+        float calculatedQuota = max(myUsage + residual * myUsageFraction, 1);
 
         val longCalculatedQuota = (long) calculatedQuota;
         log.info("computeLocalQuota: myUsage={}, totalUsage={}, myFraction={}; 
newQuota returned={} [long: {}]",

Reply via email to