This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b992cf  - Handle ChildrenChanged metadata-store event. (#11732)
7b992cf is described below

commit 7b992cff3b489f741910e17fa0495a5d2c998a24
Author: Bharani Chadalavada <[email protected]>
AuthorDate: Tue Aug 24 11:56:34 2021 -0700

    - Handle ChildrenChanged metadata-store event. (#11732)
    
    - Update the publish rate limiter correctly.
    - invoke ratelimitFunction when the rate limiter is closed.
    
    Co-authored-by: Bharani Chadalavada <[email protected]>
---
 .../resourcegroup/ResourceGroupConfigListener.java |  3 +-
 .../ResourceGroupNamespaceConfigListener.java      |  4 +--
 .../resourcegroup/ResourceGroupPublishLimiter.java | 38 +++++++++++++++++-----
 .../org/apache/pulsar/common/util/RateLimiter.java |  4 +++
 4 files changed, 37 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
index f229301..b924f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
@@ -144,7 +144,8 @@ public class ResourceGroupConfigListener implements 
Consumer<Notification> {
         LOG.info("Metadata store notification: Path {}, Type {}", notifyPath, 
notification.getType());
 
         String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
-        if (notification.getType() == NotificationType.ChildrenChanged) {
+        if ((notification.getType() == NotificationType.ChildrenChanged)
+            || (notification.getType() == NotificationType.Created)) {
             loadAllResourceGroups();
         } else if (!RESOURCEGROUPS.equals(rgName)) {
             switch (notification.getType()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
index 61f272f..1489737 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
@@ -116,11 +116,11 @@ public class ResourceGroupNamespaceConfigListener 
implements Consumer<Notificati
         }
         try {
             if (delete) {
-                LOG.info("Unregistering namespace {}, resource group {}", ns, 
current.resourceGroupName);
+                LOG.info("Unregistering namespace {} from resource group {}", 
ns, current.resourceGroupName);
                 rgService.unRegisterNameSpace(current.resourceGroupName, ns);
             }
             if (add) {
-                LOG.info("Registering namespace {} from resource group {}", 
ns, policy.resource_group_name);
+                LOG.info("Registering namespace {} with resource group {}", 
ns, policy.resource_group_name);
                 rgService.registerNameSpace(policy.resource_group_name, ns);
             }
         } catch (PulsarAdminException e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
index bed31d2..2e48445 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.common.util.RateLimitFunction;
 import org.apache.pulsar.common.util.RateLimiter;
 
 public class ResourceGroupPublishLimiter implements PublishRateLimiter, 
RateLimitFunction, AutoCloseable  {
-    protected volatile int publishMaxMessageRate = 0;
+    protected volatile long publishMaxMessageRate = 0;
     protected volatile long publishMaxByteRate = 0;
     protected volatile boolean publishThrottlingEnabled = false;
     private volatile RateLimiter publishRateLimiterOnMessage;
@@ -76,8 +76,7 @@ public class ResourceGroupPublishLimiter implements 
PublishRateLimiter, RateLimi
     }
 
     public void update(BytesAndMessagesCount maxPublishRate) {
-        this.publishMaxMessageRate = (int) maxPublishRate.messages;
-        this.publishMaxByteRate = maxPublishRate.bytes;
+        update(maxPublishRate.messages, maxPublishRate.bytes);
     }
 
     public BytesAndMessagesCount getResourceGroupPublishValues() {
@@ -88,12 +87,21 @@ public class ResourceGroupPublishLimiter implements 
PublishRateLimiter, RateLimi
     }
 
     public void update(ResourceGroup resourceGroup) {
+        long publishRateInMsgs = 0, publishRateInBytes = 0;
+        if (resourceGroup != null) {
+            publishRateInBytes = resourceGroup.getPublishRateInBytes();
+            publishRateInMsgs = resourceGroup.getPublishRateInMsgs();
+        }
+
+        update(publishRateInMsgs, publishRateInBytes);
+    }
+
+    public void update(long publishRateInMsgs, long publishRateInBytes) {
         replaceLimiters(() -> {
-            if (resourceGroup != null
-                && (resourceGroup.getPublishRateInMsgs() > 0 || 
resourceGroup.getPublishRateInBytes() > 0)) {
+            if (publishRateInMsgs > 0 || publishRateInBytes > 0) {
                 this.publishThrottlingEnabled = true;
-                this.publishMaxMessageRate = 
Math.max(resourceGroup.getPublishRateInMsgs(), 0);
-                this.publishMaxByteRate = 
Math.max(resourceGroup.getPublishRateInBytes(), 0);
+                this.publishMaxMessageRate = Math.max(publishRateInMsgs, 0);
+                this.publishMaxByteRate = Math.max(publishRateInBytes, 0);
                 if (this.publishMaxMessageRate > 0) {
                     publishRateLimiterOnMessage = RateLimiter.builder()
                             .scheduledExecutorService(scheduledExecutorService)
@@ -160,12 +168,24 @@ public class ResourceGroupPublishLimiter implements 
PublishRateLimiter, RateLimi
 
     @Override
     public void close() {
-      this.apply();
-      replaceLimiters(null);
+        // Unblock any producers, consumers waiting first.
+        // This needs to be done before replacing the filters to null
+        this.apply();
+        replaceLimiters(null);
     }
 
     @Override
     public void apply() {
+        // Make sure that both the rate limiters are applied before opening 
the flood gates.
+        RateLimiter currentTopicPublishRateLimiterOnMessage = 
publishRateLimiterOnMessage;
+        RateLimiter currentTopicPublishRateLimiterOnByte = 
publishRateLimiterOnByte;
+        if ((currentTopicPublishRateLimiterOnMessage != null
+                && 
currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
+            || (currentTopicPublishRateLimiterOnByte != null
+                && currentTopicPublishRateLimiterOnByte.getAvailablePermits() 
<= 0)) {
+            return;
+        }
+
         for (Map.Entry<String, RateLimitFunction> entry: 
rateLimitFunctionMap.entrySet()) {
             entry.getValue().apply();
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 61a9177..6f2d899 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -106,6 +106,10 @@ public class RateLimiter implements AutoCloseable{
                 renewTask.cancel(false);
             }
             isClosed = true;
+            // If there is a ratelimit function registered, invoke it to 
unblock.
+            if (rateLimitFunction != null) {
+                rateLimitFunction.apply();
+            }
         }
     }
 

Reply via email to