This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new bad7120  [Cherry-pick #11614] Fix subscribeRateLimiter cannot be 
disabled (#12109)
bad7120 is described below

commit bad71201189aaacfe39e737d4bc0c6baeebb5759
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 24 17:55:23 2021 +0800

    [Cherry-pick #11614] Fix subscribeRateLimiter cannot be disabled (#12109)
---
 .../broker/service/persistent/PersistentTopic.java | 27 +++++++++-----
 .../service/persistent/SubscribeRateLimiter.java   |  2 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 42 +++++++++++++++++++++-
 3 files changed, 60 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 371971a..83615f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -265,7 +265,7 @@ public class PersistentTopic extends AbstractTopic
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         this.transactionCompletableFuture = new CompletableFuture<>();
-        initializeDispatchRateLimiterIfNeeded(Optional.empty());
+        initializeRateLimiterIfNeeded(Optional.empty());
         registerTopicPolicyListener();
 
         this.compactedTopic = new 
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -360,16 +360,18 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
-    private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> 
policies) {
+    private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
         synchronized (dispatchRateLimiter) {
             // dispatch rate limiter for topic
             if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
                     .isDispatchRateNeeded(brokerService, policies, topic, 
Type.TOPIC)) {
                 this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(this, Type.TOPIC));
             }
-            if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter
-                    .isDispatchRateNeeded(brokerService, policies, topic)) {
+            boolean isDispatchRateNeeded = 
SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
+            if (!subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
                 this.subscribeRateLimiter = Optional.of(new 
SubscribeRateLimiter(this));
+            }  else if (!isDispatchRateNeeded) {
+                this.subscribeRateLimiter = Optional.empty();
             }
 
             // dispatch rate limiter for each subscription
@@ -720,8 +722,8 @@ public class PersistentTopic extends AbstractTopic
         if (cnx.clientAddress() != null && 
cnx.clientAddress().toString().contains(":")) {
             SubscribeRateLimiter.ConsumerIdentifier consumer = new 
SubscribeRateLimiter.ConsumerIdentifier(
                     cnx.clientAddress().toString().split(":")[0], 
consumerName, consumerId);
-            if (subscribeRateLimiter.isPresent() && 
!subscribeRateLimiter.get().subscribeAvailable(consumer)
-                    || !subscribeRateLimiter.get().tryAcquire(consumer)) {
+            if (subscribeRateLimiter.isPresent() && 
(!subscribeRateLimiter.get().subscribeAvailable(consumer)
+                    || !subscribeRateLimiter.get().tryAcquire(consumer))) {
                 log.warn("[{}] Failed to create subscription for {} {} limited 
by {}, available {}",
                         topic, subscriptionName, consumer, 
subscribeRateLimiter.get().getSubscribeRate(),
                         
subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
@@ -2409,7 +2411,7 @@ public class PersistentTopic extends AbstractTopic
                     cfg.isBrokerDeleteInactiveTopicsEnabled());
         }
 
-        initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
+        initializeRateLimiterIfNeeded(Optional.ofNullable(data));
 
         this.updateMaxPublishRate(data);
 
@@ -3122,10 +3124,17 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private void 
initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+        if (!policies.isPresent()) {
+            return;
+        }
         synchronized (subscribeRateLimiter) {
-            if (!subscribeRateLimiter.isPresent() && policies.isPresent()
-                    && policies.get().getSubscribeRate() != null) {
+            if (!subscribeRateLimiter.isPresent()
+                    && policies.get().getSubscribeRate() != null
+                    && 
policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
                 this.subscribeRateLimiter = Optional.of(new 
SubscribeRateLimiter(this));
+            } else if (!policies.get().isSubscribeRateSet()
+                    || 
policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+                this.subscribeRateLimiter = Optional.empty();
             }
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index a13328c..c10b40f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -223,7 +223,7 @@ public class SubscribeRateLimiter {
         SubscribeRate subscribeRate = 
getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
         if (subscribeRate == null) {
             return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
-                    || 
serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
+                    && 
serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
         }
         return true;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b570bff..215b899 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -47,6 +48,7 @@ import 
org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -1510,6 +1512,44 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testDisableSubscribeRate() throws Exception {
+        
assertEquals(pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(), 
0);
+        admin.topics().createNonPartitionedTopic(persistenceTopic);
+        admin.lookups().lookupTopic(persistenceTopic);
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(persistenceTopic).get().get();
+        Field field = 
PersistentTopic.class.getDeclaredField("subscribeRateLimiter");
+        field.setAccessible(true);
+        Optional<SubscribeRateLimiter> limiter = 
(Optional<SubscribeRateLimiter>) field.get(topic);
+        // sub rate limiter should be null by default
+        assertFalse(limiter.isPresent());
+
+        // Enable / Disable subscribe rate in namespace-level
+        final SubscribeRate subscribeRate = new SubscribeRate(1, 100);
+        admin.namespaces().setSubscribeRate(myNamespace, subscribeRate);
+        Awaitility.await().untilAsserted(()-> {
+            Optional<SubscribeRateLimiter> limiter1 = 
(Optional<SubscribeRateLimiter>) field.get(topic);
+            assertTrue(limiter1.isPresent());
+        });
+        admin.namespaces().removeSubscribeRate(myNamespace);
+        Awaitility.await().untilAsserted(()-> {
+            Optional<SubscribeRateLimiter> limiter2 = 
(Optional<SubscribeRateLimiter>) field.get(topic);
+            assertFalse(limiter2.isPresent());
+        });
+
+        // Enable / Disable subscribe rate in topic-level
+        admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+        Awaitility.await().untilAsserted(()-> {
+            Optional<SubscribeRateLimiter> limiter1 = 
(Optional<SubscribeRateLimiter>) field.get(topic);
+            assertTrue(limiter1.isPresent());
+        });
+        admin.topics().removeSubscribeRate(persistenceTopic);
+        Awaitility.await().untilAsserted(()-> {
+            Optional<SubscribeRateLimiter> limiter2 = 
(Optional<SubscribeRateLimiter>) field.get(topic);
+            assertFalse(limiter2.isPresent());
+        });
+    }
+
+    @Test
     public void testGetSetSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
 
@@ -1657,7 +1697,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testRemoveSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
-
+        pulsarClient.newProducer().topic(persistenceTopic).create().close();
         SubscribeRate subscribeRate = new SubscribeRate(2, 30);
         log.info("Subscribe Rate: {} will set to the topic: {}", 
subscribeRate, persistenceTopic);
         admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);

Reply via email to