This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 761724e3895 [fix][broker] Dynamic update broker-level subscribe-rate
limter (#14890)
761724e3895 is described below
commit 761724e3895ae754b39dbc81935517cddd94e50a
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Apr 8 15:34:23 2022 +0800
[fix][broker] Dynamic update broker-level subscribe-rate limter (#14890)
### Motivation
`subscribeThrottlingRatePerConsumer` and
`subscribeRatePeriodPerConsumerInSecond` ared dynamic configurations, but can
not dynamically update broker-level subscribe-rate limter
### Modifications
- Invoked`
org.apache.pulsar.broker.service.BrokerService#registerConfigurationListener`
to register listeners
about `subscribeThrottlingRatePerConsumer` and
`subscribeRatePeriodPerConsumerInSecond`
- Also add
`org.apache.pulsar.broker.service.AbstractTopic#updateSubscribeRate` to update
subscribe rate
---
.../pulsar/broker/service/AbstractTopic.java | 5 ++
.../pulsar/broker/service/BrokerService.java | 16 +++++
.../org/apache/pulsar/broker/service/Topic.java | 5 ++
.../broker/service/persistent/PersistentTopic.java | 12 ++++
.../pulsar/broker/service/SubscribeRateTest.java | 80 ++++++++++++++++++++++
5 files changed, 118 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d1d36061a63..b853f665043 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1201,4 +1201,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getPublishRate().updateBrokerValue(
publishRateInBroker(brokerService.pulsar().getConfiguration()));
}
+
+ public void updateBrokerSubscribeRate() {
+ topicPolicies.getSubscribeRate().updateBrokerValue(
+ subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0e66d3246ec..353496ecec3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2285,6 +2285,12 @@ public class BrokerService implements Closeable {
maxPublishRatePerTopicInMessages ->
updateMaxPublishRatePerTopicInMessages()
);
+ // add listener to update subscribe-rate dynamic config
+ registerConfigurationListener("subscribeThrottlingRatePerConsumer",
+ subscribeThrottlingRatePerConsumer -> updateSubscribeRate());
+ registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond",
+ subscribeRatePeriodPerConsumerInSecond -> updateSubscribeRate());
+
// add listener to notify broker publish-rate dynamic config
registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate",
(brokerPublisherThrottlingMaxMessageRate) ->
@@ -2331,6 +2337,16 @@ public class BrokerService implements Closeable {
}));
}
+ private void updateSubscribeRate() {
+ this.pulsar().getExecutor().submit(() ->
+ forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ ((PersistentTopic) topic).updateBrokerSubscribeRate();
+ ((PersistentTopic) topic).updateSubscribeRateLimiter();
+ }
+ }));
+ }
+
private void updateBrokerPublisherThrottlingMaxRate() {
int currentMaxMessageRate =
pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
long currentMaxByteRate =
pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 472bf4efdb8..20d316cb542 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -285,6 +286,10 @@ public interface Topic {
return Optional.empty();
}
+ default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
+ return Optional.empty();
+ }
+
default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
return Optional.empty();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2ee66239b8f..ffeed8bd2ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
@@ -137,6 +138,7 @@ import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerI
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -443,6 +445,16 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
+ public void updateSubscribeRateLimiter() {
+ SubscribeRate subscribeRate = this.getSubscribeRate();
+ if (isSubscribeRateEnabled(subscribeRate)) {
+ subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new
SubscribeRateLimiter(this)));
+ } else {
+ subscribeRateLimiter = Optional.empty();
+ }
+ subscribeRateLimiter.ifPresent(limiter ->
limiter.onSubscribeRateUpdate(subscribeRate));
+ }
+
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext
publishContext) {
if (brokerService.isBrokerEntryMetadataEnabled()) {
ledger.asyncAddEntry(headersAndPayload,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
new file mode 100644
index 00000000000..76399f32f7d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.client.api.Producer;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SubscribeRateTest extends BrokerTestBase {
+
+ @Override
+ protected void setup() throws Exception {
+ //No-op
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ //No-op
+ }
+
+ @Test
+ public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
+ super.baseSetup();
+ final String topic =
"persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
+
+ Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+ final int ratePerConsumer = 10;
+ final int ratePeriod = 60;
+
+ String defaultRatePerConsumer =
admin.brokers().getRuntimeConfigurations().get("subscribeThrottlingRatePerConsumer");
+ String defaultRatePeriod =
admin.brokers().getRuntimeConfigurations().get("subscribeRatePeriodPerConsumerInSecond");
+ Assert.assertNotNull(defaultRatePerConsumer);
+ Assert.assertNotNull(defaultRatePeriod);
+ Assert.assertNotEquals(ratePerConsumer,
Integer.parseInt(defaultRatePerConsumer));
+ Assert.assertNotEquals(ratePeriod,
Integer.parseInt(defaultRatePeriod));
+
+ // subscribeThrottlingRatePerConsumer
+
admin.brokers().updateDynamicConfiguration("subscribeThrottlingRatePerConsumer",
ratePerConsumer + "");
+ Awaitility.await().untilAsserted(() ->
Assert.assertTrue(topicRef.getSubscribeRateLimiter().isPresent()));
+ SubscribeRateLimiter limiter =
topicRef.getSubscribeRateLimiter().get();
+
Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer,
ratePerConsumer);
+ Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, 30);
+
+ // subscribeRatePeriodPerConsumerInSecond
+
admin.brokers().updateDynamicConfiguration("subscribeRatePeriodPerConsumerInSecond",
ratePeriod + "");
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, ratePeriod));
+
Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer,
ratePerConsumer);
+
+ producer.close();
+ }
+}