This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 761724e3895 [fix][broker] Dynamic update broker-level subscribe-rate 
limter (#14890)
761724e3895 is described below

commit 761724e3895ae754b39dbc81935517cddd94e50a
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Apr 8 15:34:23 2022 +0800

    [fix][broker] Dynamic update broker-level subscribe-rate limter (#14890)
    
    ### Motivation
    
     `subscribeThrottlingRatePerConsumer` and 
`subscribeRatePeriodPerConsumerInSecond` ared dynamic configurations, but can 
not dynamically update broker-level subscribe-rate limter
    
    ### Modifications
    - Invoked` 
org.apache.pulsar.broker.service.BrokerService#registerConfigurationListener` 
to register listeners
    about `subscribeThrottlingRatePerConsumer` and 
`subscribeRatePeriodPerConsumerInSecond`
    -  Also add 
`org.apache.pulsar.broker.service.AbstractTopic#updateSubscribeRate` to update 
subscribe rate
---
 .../pulsar/broker/service/AbstractTopic.java       |  5 ++
 .../pulsar/broker/service/BrokerService.java       | 16 +++++
 .../org/apache/pulsar/broker/service/Topic.java    |  5 ++
 .../broker/service/persistent/PersistentTopic.java | 12 ++++
 .../pulsar/broker/service/SubscribeRateTest.java   | 80 ++++++++++++++++++++++
 5 files changed, 118 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d1d36061a63..b853f665043 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1201,4 +1201,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         topicPolicies.getPublishRate().updateBrokerValue(
             publishRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerSubscribeRate() {
+        topicPolicies.getSubscribeRate().updateBrokerValue(
+            subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0e66d3246ec..353496ecec3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2285,6 +2285,12 @@ public class BrokerService implements Closeable {
             maxPublishRatePerTopicInMessages -> 
updateMaxPublishRatePerTopicInMessages()
         );
 
+        // add listener to update subscribe-rate dynamic config
+        registerConfigurationListener("subscribeThrottlingRatePerConsumer",
+            subscribeThrottlingRatePerConsumer -> updateSubscribeRate());
+        registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond",
+            subscribeRatePeriodPerConsumerInSecond -> updateSubscribeRate());
+
         // add listener to notify broker publish-rate dynamic config
         
registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate",
                 (brokerPublisherThrottlingMaxMessageRate) ->
@@ -2331,6 +2337,16 @@ public class BrokerService implements Closeable {
             }));
     }
 
+    private void updateSubscribeRate() {
+        this.pulsar().getExecutor().submit(() ->
+            forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    ((PersistentTopic) topic).updateBrokerSubscribeRate();
+                    ((PersistentTopic) topic).updateSubscribeRateLimiter();
+                }
+            }));
+    }
+
     private void updateBrokerPublisherThrottlingMaxRate() {
         int currentMaxMessageRate = 
pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
         long currentMaxByteRate = 
pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 472bf4efdb8..20d316cb542 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -285,6 +286,10 @@ public interface Topic {
         return Optional.empty();
     }
 
+    default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
+        return Optional.empty();
+    }
+
     default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
         return Optional.empty();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2ee66239b8f..ffeed8bd2ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static 
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
 import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
@@ -137,6 +138,7 @@ import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerI
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -443,6 +445,16 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    public void updateSubscribeRateLimiter() {
+        SubscribeRate subscribeRate = this.getSubscribeRate();
+        if (isSubscribeRateEnabled(subscribeRate)) {
+            subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new 
SubscribeRateLimiter(this)));
+        } else {
+            subscribeRateLimiter = Optional.empty();
+        }
+        subscribeRateLimiter.ifPresent(limiter -> 
limiter.onSubscribeRateUpdate(subscribeRate));
+    }
+
     private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext 
publishContext) {
         if (brokerService.isBrokerEntryMetadataEnabled()) {
             ledger.asyncAddEntry(headersAndPayload,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
new file mode 100644
index 00000000000..76399f32f7d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.client.api.Producer;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SubscribeRateTest extends BrokerTestBase {
+
+    @Override
+    protected void setup() throws Exception {
+        //No-op
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        //No-op
+    }
+
+    @Test
+    public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        final String topic = 
"persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("producer-name")
+            .create();
+
+        Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+        final int ratePerConsumer = 10;
+        final int ratePeriod = 60;
+
+        String defaultRatePerConsumer = 
admin.brokers().getRuntimeConfigurations().get("subscribeThrottlingRatePerConsumer");
+        String defaultRatePeriod = 
admin.brokers().getRuntimeConfigurations().get("subscribeRatePeriodPerConsumerInSecond");
+        Assert.assertNotNull(defaultRatePerConsumer);
+        Assert.assertNotNull(defaultRatePeriod);
+        Assert.assertNotEquals(ratePerConsumer, 
Integer.parseInt(defaultRatePerConsumer));
+        Assert.assertNotEquals(ratePeriod, 
Integer.parseInt(defaultRatePeriod));
+
+        // subscribeThrottlingRatePerConsumer
+        
admin.brokers().updateDynamicConfiguration("subscribeThrottlingRatePerConsumer",
 ratePerConsumer + "");
+        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(topicRef.getSubscribeRateLimiter().isPresent()));
+        SubscribeRateLimiter limiter = 
topicRef.getSubscribeRateLimiter().get();
+        
Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer,
 ratePerConsumer);
+        Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, 30);
+
+        // subscribeRatePeriodPerConsumerInSecond
+        
admin.brokers().updateDynamicConfiguration("subscribeRatePeriodPerConsumerInSecond",
 ratePeriod + "");
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, ratePeriod));
+        
Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer,
 ratePerConsumer);
+
+        producer.close();
+    }
+}

Reply via email to