This is an automated email from the ASF dual-hosted git repository.
cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b992cf - Handle ChildrenChanged metadata-store event. (#11732)
7b992cf is described below
commit 7b992cff3b489f741910e17fa0495a5d2c998a24
Author: Bharani Chadalavada <[email protected]>
AuthorDate: Tue Aug 24 11:56:34 2021 -0700
- Handle ChildrenChanged metadata-store event. (#11732)
- Update the publish rate limiter correctly.
- invoke ratelimitFunction when the rate limiter is closed.
Co-authored-by: Bharani Chadalavada <[email protected]>
---
.../resourcegroup/ResourceGroupConfigListener.java | 3 +-
.../ResourceGroupNamespaceConfigListener.java | 4 +--
.../resourcegroup/ResourceGroupPublishLimiter.java | 38 +++++++++++++++++-----
.../org/apache/pulsar/common/util/RateLimiter.java | 4 +++
4 files changed, 37 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
index f229301..b924f7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
@@ -144,7 +144,8 @@ public class ResourceGroupConfigListener implements
Consumer<Notification> {
LOG.info("Metadata store notification: Path {}, Type {}", notifyPath,
notification.getType());
String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
- if (notification.getType() == NotificationType.ChildrenChanged) {
+ if ((notification.getType() == NotificationType.ChildrenChanged)
+ || (notification.getType() == NotificationType.Created)) {
loadAllResourceGroups();
} else if (!RESOURCEGROUPS.equals(rgName)) {
switch (notification.getType()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
index 61f272f..1489737 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
@@ -116,11 +116,11 @@ public class ResourceGroupNamespaceConfigListener
implements Consumer<Notificati
}
try {
if (delete) {
- LOG.info("Unregistering namespace {}, resource group {}", ns,
current.resourceGroupName);
+ LOG.info("Unregistering namespace {} from resource group {}",
ns, current.resourceGroupName);
rgService.unRegisterNameSpace(current.resourceGroupName, ns);
}
if (add) {
- LOG.info("Registering namespace {} from resource group {}",
ns, policy.resource_group_name);
+ LOG.info("Registering namespace {} with resource group {}",
ns, policy.resource_group_name);
rgService.registerNameSpace(policy.resource_group_name, ns);
}
} catch (PulsarAdminException e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
index bed31d2..2e48445 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
public class ResourceGroupPublishLimiter implements PublishRateLimiter,
RateLimitFunction, AutoCloseable {
- protected volatile int publishMaxMessageRate = 0;
+ protected volatile long publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
private volatile RateLimiter publishRateLimiterOnMessage;
@@ -76,8 +76,7 @@ public class ResourceGroupPublishLimiter implements
PublishRateLimiter, RateLimi
}
public void update(BytesAndMessagesCount maxPublishRate) {
- this.publishMaxMessageRate = (int) maxPublishRate.messages;
- this.publishMaxByteRate = maxPublishRate.bytes;
+ update(maxPublishRate.messages, maxPublishRate.bytes);
}
public BytesAndMessagesCount getResourceGroupPublishValues() {
@@ -88,12 +87,21 @@ public class ResourceGroupPublishLimiter implements
PublishRateLimiter, RateLimi
}
public void update(ResourceGroup resourceGroup) {
+ long publishRateInMsgs = 0, publishRateInBytes = 0;
+ if (resourceGroup != null) {
+ publishRateInBytes = resourceGroup.getPublishRateInBytes();
+ publishRateInMsgs = resourceGroup.getPublishRateInMsgs();
+ }
+
+ update(publishRateInMsgs, publishRateInBytes);
+ }
+
+ public void update(long publishRateInMsgs, long publishRateInBytes) {
replaceLimiters(() -> {
- if (resourceGroup != null
- && (resourceGroup.getPublishRateInMsgs() > 0 ||
resourceGroup.getPublishRateInBytes() > 0)) {
+ if (publishRateInMsgs > 0 || publishRateInBytes > 0) {
this.publishThrottlingEnabled = true;
- this.publishMaxMessageRate =
Math.max(resourceGroup.getPublishRateInMsgs(), 0);
- this.publishMaxByteRate =
Math.max(resourceGroup.getPublishRateInBytes(), 0);
+ this.publishMaxMessageRate = Math.max(publishRateInMsgs, 0);
+ this.publishMaxByteRate = Math.max(publishRateInBytes, 0);
if (this.publishMaxMessageRate > 0) {
publishRateLimiterOnMessage = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
@@ -160,12 +168,24 @@ public class ResourceGroupPublishLimiter implements
PublishRateLimiter, RateLimi
@Override
public void close() {
- this.apply();
- replaceLimiters(null);
+ // Unblock any producers, consumers waiting first.
+ // This needs to be done before replacing the filters to null
+ this.apply();
+ replaceLimiters(null);
}
@Override
public void apply() {
+ // Make sure that both the rate limiters are applied before opening
the flood gates.
+ RateLimiter currentTopicPublishRateLimiterOnMessage =
publishRateLimiterOnMessage;
+ RateLimiter currentTopicPublishRateLimiterOnByte =
publishRateLimiterOnByte;
+ if ((currentTopicPublishRateLimiterOnMessage != null
+ &&
currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
+ || (currentTopicPublishRateLimiterOnByte != null
+ && currentTopicPublishRateLimiterOnByte.getAvailablePermits()
<= 0)) {
+ return;
+ }
+
for (Map.Entry<String, RateLimitFunction> entry:
rateLimitFunctionMap.entrySet()) {
entry.getValue().apply();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 61a9177..6f2d899 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -106,6 +106,10 @@ public class RateLimiter implements AutoCloseable{
renewTask.cancel(false);
}
isClosed = true;
+ // If there is a ratelimit function registered, invoke it to
unblock.
+ if (rateLimitFunction != null) {
+ rateLimitFunction.apply();
+ }
}
}