This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 263a90b [PIP-82] [pulsar-broker] Add publish ratelimiter to resource
group (#11184)
263a90b is described below
commit 263a90badbad3425599fe57bc2586fe37be6c9e6
Author: Bharani Chadalavada <[email protected]>
AuthorDate: Thu Jul 15 10:40:41 2021 -0700
[PIP-82] [pulsar-broker] Add publish ratelimiter to resource group (#11184)
* Add ratelimiter to resource group and attach namespace to resource group
rate limiter.
Co-authored-by: Bharani Chadalavada <[email protected]>
---
.../pulsar/broker/resourcegroup/ResourceGroup.java | 10 ++
.../resourcegroup/ResourceGroupPublishLimiter.java | 151 ++++++++++++++++++++
.../broker/resourcegroup/ResourceGroupService.java | 6 +
.../pulsar/broker/service/AbstractTopic.java | 54 ++++++-
.../apache/pulsar/broker/service/ServerCnx.java | 8 ++
.../org/apache/pulsar/broker/service/Topic.java | 4 +
.../service/nonpersistent/NonPersistentTopic.java | 3 +
.../broker/service/persistent/PersistentTopic.java | 3 +
.../ResourceGroupConfigListenerTest.java | 10 +-
.../ResourceGroupRateLimiterTest.java | 155 +++++++++++++++++++++
10 files changed, 394 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 5560216..0a48bee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus;
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
* publish, another one for dispatch, etc.
*/
public class ResourceGroup {
+
/**
* Convenience class for bytes and messages counts, which are used
together in a lot of the following code.
*/
@@ -78,6 +80,9 @@ public class ResourceGroup {
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.setDefaultResourceUsageTransportHandlers();
+ this.resourceGroupPublishLimiter = new
ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
+ log.info("attaching publish rate limiter {} to {} get {}",
this.resourceGroupPublishLimiter.toString(), name,
+ this.getResourceGroupPublishLimiter());
}
// ctor for overriding the transport-manager fill/set buffer.
@@ -90,6 +95,7 @@ public class ResourceGroup {
this.resourceGroupName = rgName;
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
+ this.resourceGroupPublishLimiter = new
ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.ruPublisher = rgPublisher;
this.ruConsumer = rgConsumer;
}
@@ -99,6 +105,7 @@ public class ResourceGroup {
public ResourceGroup(ResourceGroup other) {
this.resourceGroupName = other.resourceGroupName;
this.rgs = other.rgs;
+ this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
this.setResourceGroupMonitoringClassFields();
// ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer
from other, if required.
@@ -534,6 +541,9 @@ public class ResourceGroup {
.help("Number of times local usage was reported (vs. suppressed
due to negligible change)")
.labelNames(resourceGroupMontoringclassLabels)
.register();
+ // Publish rate limiter for the resource group
+ @Getter
+ protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;
protected static class PerMonitoringClassFields {
// This lock covers all the "local" counts (i.e., except for the
per-broker usage stats).
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
new file mode 100644
index 0000000..e0df150
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.PublishRateLimiter;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.common.util.RateLimitFunction;
+import org.apache.pulsar.common.util.RateLimiter;
+
+public class ResourceGroupPublishLimiter implements PublishRateLimiter,
RateLimitFunction, AutoCloseable {
+ protected volatile int publishMaxMessageRate = 0;
+ protected volatile long publishMaxByteRate = 0;
+ protected volatile boolean publishThrottlingEnabled = false;
+ private volatile RateLimiter publishRateLimiterOnMessage;
+ private volatile RateLimiter publishRateLimiterOnByte;
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ ConcurrentHashMap<String, RateLimitFunction> rateLimitFunctionMap = new
ConcurrentHashMap<>();
+
+ public ResourceGroupPublishLimiter(ResourceGroup resourceGroup,
ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = scheduledExecutorService;
+ update(resourceGroup);
+ }
+
+ @Override
+ public void checkPublishRate() {
+ // No-op
+ }
+
+ @Override
+ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+ // No-op
+ }
+
+ @Override
+ public boolean resetPublishCount() {
+ return true;
+ }
+
+ @Override
+ public boolean isPublishRateExceeded() {
+ return false;
+ }
+
+ @Override
+ public void update(Policies policies, String clusterName) {
+ // No-op
+ }
+
+ @Override
+ public void update(PublishRate maxPublishRate) {
+ // No-op
+ }
+
+ public void update(ResourceGroup resourceGroup) {
+ replaceLimiters(() -> {
+ if (resourceGroup != null
+ && (resourceGroup.getPublishRateInMsgs() > 0 ||
resourceGroup.getPublishRateInBytes() > 0)) {
+ this.publishThrottlingEnabled = true;
+ this.publishMaxMessageRate =
Math.max(resourceGroup.getPublishRateInMsgs(), 0);
+ this.publishMaxByteRate =
Math.max(resourceGroup.getPublishRateInBytes(), 0);
+ if (this.publishMaxMessageRate > 0) {
+ // TODO: pass the executor
+ publishRateLimiterOnMessage =
+ new RateLimiter(publishMaxMessageRate, 1,
TimeUnit.SECONDS, this::apply);
+ }
+ if (this.publishMaxByteRate > 0) {
+ // TODO: pass the executor
+ publishRateLimiterOnByte =
+ new RateLimiter(publishMaxByteRate, 1,
TimeUnit.SECONDS, this::apply);
+ }
+ } else {
+ this.publishMaxMessageRate = 0;
+ this.publishMaxByteRate = 0;
+ this.publishThrottlingEnabled = false;
+ publishRateLimiterOnMessage = null;
+ publishRateLimiterOnByte = null;
+ }
+ });
+ }
+
+ public boolean tryAcquire(int numbers, long bytes) {
+ return (publishRateLimiterOnMessage == null ||
publishRateLimiterOnMessage.tryAcquire(numbers))
+ && (publishRateLimiterOnByte == null ||
publishRateLimiterOnByte.tryAcquire(bytes));
+ }
+
+ public void registerRateLimitFunction(String name, RateLimitFunction func)
{
+ rateLimitFunctionMap.put(name, func);
+ }
+
+ public void unregisterRateLimitFunction(String name) {
+ rateLimitFunctionMap.remove(name);
+ }
+
+ private void replaceLimiters(Runnable updater) {
+ RateLimiter previousPublishRateLimiterOnMessage =
publishRateLimiterOnMessage;
+ publishRateLimiterOnMessage = null;
+ RateLimiter previousPublishRateLimiterOnByte =
publishRateLimiterOnByte;
+ publishRateLimiterOnByte = null;
+ try {
+ if (updater != null) {
+ updater.run();
+ }
+ } finally {
+ // Close previous limiters to prevent resource leakages.
+ // Delay closing of previous limiters after new ones are in place
so that updating the limiter
+ // doesn't cause unavailability.
+ if (previousPublishRateLimiterOnMessage != null) {
+ previousPublishRateLimiterOnMessage.close();
+ }
+ if (previousPublishRateLimiterOnByte != null) {
+ previousPublishRateLimiterOnByte.close();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ this.apply();
+ replaceLimiters(null);
+ }
+
+ @Override
+ public void apply() {
+ for (Map.Entry<String, RateLimitFunction> entry:
rateLimitFunctionMap.entrySet()) {
+ entry.getValue().apply();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 058f16a..aaee8b8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
@@ -163,6 +164,8 @@ public class ResourceGroupService {
throw new PulsarAdminException(errMesg);
}
+ rg.resourceGroupPublishLimiter.close();
+ rg.resourceGroupPublishLimiter = null;
resourceGroupsMap.remove(name);
}
@@ -651,7 +654,10 @@ public class ResourceGroupService {
}
private static final Logger log =
LoggerFactory.getLogger(ResourceGroupService.class);
+
+ @Getter
private final PulsarService pulsar;
+
protected final ResourceQuotaCalculator quotaCalculator;
private ResourceUsageTransportManager resourceUsageTransportManagerMgr;
private final ResourceGroupConfigListener rgConfigListener;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f98b57b..65f70ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -35,10 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
@@ -103,8 +106,13 @@ public abstract class AbstractTopic implements Topic {
protected volatile PublishRateLimiter topicPublishRateLimiter;
+ protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
+
protected boolean preciseTopicPublishRateLimitingEnable;
+ @Getter
+ protected boolean resourceGroupRateLimitingEnabled;
+
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
@@ -746,6 +754,17 @@ public abstract class AbstractTopic implements Topic {
}
@Override
+ public boolean isResourceGroupPublishRateExceeded(int numMessages, int
bytes) {
+ return this.resourceGroupRateLimitingEnabled
+ && !this.resourceGroupPublishLimiter.tryAcquire(numMessages,
bytes);
+ }
+
+ @Override
+ public boolean isResourceGroupRateLimitingEnabled() {
+ return this.resourceGroupRateLimitingEnabled;
+ }
+
+ @Override
public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
// whether topic publish rate exceed if precise rate limit is enable
return preciseTopicPublishRateLimitingEnable &&
!this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
@@ -787,14 +806,39 @@ public abstract class AbstractTopic implements Topic {
//both namespace-level and topic-level policy are not set, try to use
broker-level policy
ServiceConfiguration serviceConfiguration =
brokerService.pulsar().getConfiguration();
- if (publishRate == null) {
+ if (publishRate != null) {
+ //publishRate is not null , use namespace-level policy
+ updatePublishDispatcher(publishRate);
+ } else {
PublishRate brokerPublishRate = new
PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
- , serviceConfiguration.getMaxPublishRatePerTopicInBytes());
+ , serviceConfiguration.getMaxPublishRatePerTopicInBytes());
updatePublishDispatcher(brokerPublishRate);
- return;
}
- //publishRate is not null , use namespace-level policy
- updatePublishDispatcher(publishRate);
+
+ // attach the resource-group level rate limiters, if set
+ String rgName = policies != null && policies.resource_group_name !=
null
+ ? policies.resource_group_name
+ : null;
+ if (rgName != null) {
+ final ResourceGroup resourceGroup =
+
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+ if (resourceGroup != null) {
+ this.resourceGroupRateLimitingEnabled = true;
+ this.resourceGroupPublishLimiter =
resourceGroup.getResourceGroupPublishLimiter();
+
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
+ () -> this.enableCnxAutoRead());
+ log.info("Using resource group {} rate limiter for topic {}",
rgName, topic);
+ return;
+ }
+ } else {
+ if (this.resourceGroupRateLimitingEnabled) {
+
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+ this.resourceGroupPublishLimiter = null;
+ this.resourceGroupRateLimitingEnabled = false;
+ }
+ /* Namespace detached from resource group. Enable the producer
read */
+ enableProducerReadForPublishRateLimiting();
+ }
}
public long getMsgInCounter() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fe5f087..9b51158 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2211,6 +2211,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
isPublishRateExceeded =
producer.getTopic().isBrokerPublishRateExceeded();
} else {
+ if (producer.getTopic().isResourceGroupRateLimitingEnabled()) {
+ final boolean resourceGroupPublishRateExceeded =
+
producer.getTopic().isResourceGroupPublishRateExceeded(numMessages, msgSize);
+ if (resourceGroupPublishRateExceeded) {
+ producer.getTopic().disableCnxAutoRead();
+ return;
+ }
+ }
isPublishRateExceeded =
producer.getTopic().isPublishRateExceeded();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index e0cc873..6c4d694 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -170,6 +170,10 @@ public interface Topic {
boolean isTopicPublishRateExceeded(int msgSize, int numMessages);
+ boolean isResourceGroupRateLimitingEnabled();
+
+ boolean isResourceGroupPublishRateExceeded(int msgSize, int numMessages);
+
boolean isBrokerPublishRateExceeded();
void disableCnxAutoRead();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2751d1e..8f9ba86 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -445,6 +445,9 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
producers.values().forEach(producer ->
futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+ if (this.resourceGroupPublishLimiter != null) {
+
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+ }
CompletableFuture<Void> clientCloseFuture =
closeWithoutWaitingClientDisconnect ?
CompletableFuture.completedFuture(null)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f0d967..1a84258 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1150,6 +1150,9 @@ public class PersistentTopic extends AbstractTopic
replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
producers.values().forEach(producer ->
futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+ if (this.resourceGroupPublishLimiter != null) {
+
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+ }
CompletableFuture<Void> clientCloseFuture =
closeWithoutWaitingClientDisconnect
? CompletableFuture.completedFuture(null)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
index 3071edb..1a85aee 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
@@ -115,18 +115,18 @@ public class ResourceGroupConfigListenerTest extends
MockedPulsarServiceBaseTest
@Test
public void testResourceGroupAttachToNamespace() throws Exception {
createResourceGroup(rgName, testAddRg);
-
admin.tenants().createTenant(tenantName,
new TenantInfoImpl(Sets.newHashSet("fake-admin-role"),
Sets.newHashSet(clusterName)));
admin.namespaces().createNamespace(namespaceName);
- admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
+ admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
Awaitility.await().untilAsserted(() ->
- assertNotNull(pulsar
- .getResourceGroupServiceManager()
- .getNamespaceResourceGroup(namespaceName)));
+
assertNotNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName)));
admin.namespaces().removeNamespaceResourceGroup(namespaceName);
+ Awaitility.await().untilAsserted(() ->
+
assertNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName)));
+
admin.namespaces().deleteNamespace(namespaceName);
deleteResourceGroup(rgName);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
new file mode 100644
index 0000000..b51ebf6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class ResourceGroupRateLimiterTest extends BrokerTestBase {
+
+ final String rgName = "testRG";
+ org.apache.pulsar.common.policies.data.ResourceGroup testAddRg =
+ new org.apache.pulsar.common.policies.data.ResourceGroup();
+ final String namespaceName = "prop/ns-abc";
+ final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
+ final String nonPersistentTopicString =
"non-persistent://prop/ns-abc/test-topic";
+ final int MESSAGE_SIZE = 10;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setMaxPendingPublishRequestsPerConnection(0);
+ super.baseSetup();
+ prepareData();
+
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ public void createResourceGroup(String rgName,
org.apache.pulsar.common.policies.data.ResourceGroup rg) throws
PulsarAdminException {
+ admin.resourcegroups().createResourceGroup(rgName, rg);
+
+ Awaitility.await().untilAsserted(() -> {
+ final org.apache.pulsar.broker.resourcegroup.ResourceGroup
resourceGroup = pulsar
+ .getResourceGroupServiceManager().resourceGroupGet(rgName);
+ assertNotNull(resourceGroup);
+ assertEquals(rgName, resourceGroup.resourceGroupName);
+ });
+
+ }
+
+ public void deleteResourceGroup(String rgName) throws PulsarAdminException
{
+ admin.resourcegroups().deleteResourceGroup(rgName);
+ Awaitility.await().atMost(1, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertNull(pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName)));
+ }
+
+ public void testRateLimit(String topicString) throws PulsarAdminException,
PulsarClientException,
+ InterruptedException, ExecutionException, TimeoutException {
+ createResourceGroup(rgName, testAddRg);
+ admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
+
+ Awaitility.await().untilAsserted(() ->
+ assertNotNull(pulsar.getResourceGroupServiceManager()
+ .getNamespaceResourceGroup(namespaceName)));
+
+ Awaitility.await().untilAsserted(() ->
+ assertNotNull(pulsar.getResourceGroupServiceManager()
+ .resourceGroupGet(rgName).getResourceGroupPublishLimiter()));
+
+ Producer<byte[]> producer = null;
+ try {
+ producer = pulsarClient.newProducer()
+ .topic(persistentTopicString)
+ .create();
+ } catch (PulsarClientException p) {
+ final String errMesg = String.format("Got exception while building
producer: ex=%s", p.getMessage());
+ Assert.fail(errMesg);
+ }
+
+ MessageId messageId = null;
+ try {
+ // first will be success
+ messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100,
TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(messageId);
+ } catch (TimeoutException e) {
+ Assert.fail("should not fail");
+ }
+
+ // Second message should fail with timeout.
+ Producer<byte[]> finalProducer = producer;
+ Assert.assertThrows(TimeoutException.class, () -> {
+ finalProducer.sendAsync(new byte[MESSAGE_SIZE]).get(500,
TimeUnit.MILLISECONDS);});
+
+ // In the next interval, the above message will be accepted. Wait for
one more second (total 2s),
+ // to publish the next message.
+ Thread.sleep(2000);
+
+ try {
+ // third one should succeed
+ messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100,
TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(messageId);
+ } catch (TimeoutException e) {
+ Assert.fail("should not fail");
+ }
+
+ // Now detach the namespace
+ admin.namespaces().removeNamespaceResourceGroup(namespaceName);
+ deleteResourceGroup(rgName);
+
+ // No rate limits should be applied.
+ for (int i = 0; i < 5; i++) {
+ messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100,
TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(messageId);
+ }
+ producer.close();
+ }
+
+ @Test
+ public void testResourceGroupPublishRateLimit() throws Exception {
+ testRateLimit(persistentTopicString);
+ testRateLimit(nonPersistentTopicString);
+ }
+
+ private void prepareData() {
+ testAddRg.setPublishRateInBytes(MESSAGE_SIZE);
+ testAddRg.setPublishRateInMsgs(1);
+ testAddRg.setDispatchRateInMsgs(-1);
+ testAddRg.setDispatchRateInBytes(-1);
+ }
+}