This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new bad7120 [Cherry-pick #11614] Fix subscribeRateLimiter cannot be
disabled (#12109)
bad7120 is described below
commit bad71201189aaacfe39e737d4bc0c6baeebb5759
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 24 17:55:23 2021 +0800
[Cherry-pick #11614] Fix subscribeRateLimiter cannot be disabled (#12109)
---
.../broker/service/persistent/PersistentTopic.java | 27 +++++++++-----
.../service/persistent/SubscribeRateLimiter.java | 2 +-
.../pulsar/broker/admin/TopicPoliciesTest.java | 42 +++++++++++++++++++++-
3 files changed, 60 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 371971a..83615f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -265,7 +265,7 @@ public class PersistentTopic extends AbstractTopic
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
this.transactionCompletableFuture = new CompletableFuture<>();
- initializeDispatchRateLimiterIfNeeded(Optional.empty());
+ initializeRateLimiterIfNeeded(Optional.empty());
registerTopicPolicyListener();
this.compactedTopic = new
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -360,16 +360,18 @@ public class PersistentTopic extends AbstractTopic
}
}
- private void initializeDispatchRateLimiterIfNeeded(Optional<Policies>
policies) {
+ private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
synchronized (dispatchRateLimiter) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic,
Type.TOPIC)) {
this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(this, Type.TOPIC));
}
- if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter
- .isDispatchRateNeeded(brokerService, policies, topic)) {
+ boolean isDispatchRateNeeded =
SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
+ if (!subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
this.subscribeRateLimiter = Optional.of(new
SubscribeRateLimiter(this));
+ } else if (!isDispatchRateNeeded) {
+ this.subscribeRateLimiter = Optional.empty();
}
// dispatch rate limiter for each subscription
@@ -720,8 +722,8 @@ public class PersistentTopic extends AbstractTopic
if (cnx.clientAddress() != null &&
cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new
SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0],
consumerName, consumerId);
- if (subscribeRateLimiter.isPresent() &&
!subscribeRateLimiter.get().subscribeAvailable(consumer)
- || !subscribeRateLimiter.get().tryAcquire(consumer)) {
+ if (subscribeRateLimiter.isPresent() &&
(!subscribeRateLimiter.get().subscribeAvailable(consumer)
+ || !subscribeRateLimiter.get().tryAcquire(consumer))) {
log.warn("[{}] Failed to create subscription for {} {} limited
by {}, available {}",
topic, subscriptionName, consumer,
subscribeRateLimiter.get().getSubscribeRate(),
subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
@@ -2409,7 +2411,7 @@ public class PersistentTopic extends AbstractTopic
cfg.isBrokerDeleteInactiveTopicsEnabled());
}
- initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
+ initializeRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
@@ -3122,10 +3124,17 @@ public class PersistentTopic extends AbstractTopic
}
private void
initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+ if (!policies.isPresent()) {
+ return;
+ }
synchronized (subscribeRateLimiter) {
- if (!subscribeRateLimiter.isPresent() && policies.isPresent()
- && policies.get().getSubscribeRate() != null) {
+ if (!subscribeRateLimiter.isPresent()
+ && policies.get().getSubscribeRate() != null
+ &&
policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
this.subscribeRateLimiter = Optional.of(new
SubscribeRateLimiter(this));
+ } else if (!policies.get().isSubscribeRateSet()
+ ||
policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+ this.subscribeRateLimiter = Optional.empty();
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index a13328c..c10b40f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -223,7 +223,7 @@ public class SubscribeRateLimiter {
SubscribeRate subscribeRate =
getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
if (subscribeRate == null) {
return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
- ||
serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
+ &&
serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
}
return true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b570bff..215b899 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -47,6 +48,7 @@ import
org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -1510,6 +1512,44 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testDisableSubscribeRate() throws Exception {
+
assertEquals(pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
0);
+ admin.topics().createNonPartitionedTopic(persistenceTopic);
+ admin.lookups().lookupTopic(persistenceTopic);
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(persistenceTopic).get().get();
+ Field field =
PersistentTopic.class.getDeclaredField("subscribeRateLimiter");
+ field.setAccessible(true);
+ Optional<SubscribeRateLimiter> limiter =
(Optional<SubscribeRateLimiter>) field.get(topic);
+ // sub rate limiter should be null by default
+ assertFalse(limiter.isPresent());
+
+ // Enable / Disable subscribe rate in namespace-level
+ final SubscribeRate subscribeRate = new SubscribeRate(1, 100);
+ admin.namespaces().setSubscribeRate(myNamespace, subscribeRate);
+ Awaitility.await().untilAsserted(()-> {
+ Optional<SubscribeRateLimiter> limiter1 =
(Optional<SubscribeRateLimiter>) field.get(topic);
+ assertTrue(limiter1.isPresent());
+ });
+ admin.namespaces().removeSubscribeRate(myNamespace);
+ Awaitility.await().untilAsserted(()-> {
+ Optional<SubscribeRateLimiter> limiter2 =
(Optional<SubscribeRateLimiter>) field.get(topic);
+ assertFalse(limiter2.isPresent());
+ });
+
+ // Enable / Disable subscribe rate in topic-level
+ admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+ Awaitility.await().untilAsserted(()-> {
+ Optional<SubscribeRateLimiter> limiter1 =
(Optional<SubscribeRateLimiter>) field.get(topic);
+ assertTrue(limiter1.isPresent());
+ });
+ admin.topics().removeSubscribeRate(persistenceTopic);
+ Awaitility.await().untilAsserted(()-> {
+ Optional<SubscribeRateLimiter> limiter2 =
(Optional<SubscribeRateLimiter>) field.get(topic);
+ assertFalse(limiter2.isPresent());
+ });
+ }
+
+ @Test
public void testGetSetSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
@@ -1657,7 +1697,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testRemoveSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
-
+ pulsarClient.newProducer().topic(persistenceTopic).create().close();
SubscribeRate subscribeRate = new SubscribeRate(2, 30);
log.info("Subscribe Rate: {} will set to the topic: {}",
subscribeRate, persistenceTopic);
admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);