This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b71f6113121940566006ed353ed11fdf456f07b0 Author: Xiaoyu Hou <[email protected]> AuthorDate: Sun Jun 26 16:30:19 2022 +0800 [fix][broker]Fix subscribe dispathcer limiter not be initialized (#16175) (cherry picked from commit 7afc41137b7efa331c66d3a13032f971512f6db6) --- .../pulsar/broker/service/BrokerService.java | 2 +- .../service/SubscribeDispatchLimiterTest.java | 108 +++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1287076c10f..7b442698e92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2302,7 +2302,7 @@ public class BrokerService implements Closeable { topic.getSubscriptions().forEach((subName, persistentSubscription) -> { Dispatcher dispatcher = persistentSubscription.getDispatcher(); if (dispatcher != null) { - dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate); + dispatcher.updateRateLimiter(); } }); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java new file mode 100644 index 00000000000..801e1be6a73 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class SubscribeDispatchLimiterTest extends BrokerTestBase { + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setDispatchThrottlingRatePerSubscriptionInMsg(0); + conf.setDispatchThrottlingRatePerSubscriptionInByte(0L); + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel() throws Exception { + final String topicName = "persistent://" + newTopicName(); + final String subscribeName = "cg_testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscribeName) + .subscribe(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + assertNotNull(topic); + + PersistentSubscription subscription = topic.getSubscriptions().get(subscribeName); + assertNotNull(subscription); + assertFalse(subscription.getDispatcher().getRateLimiter().isPresent()); + + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", "100"); + Awaitility.await().untilAsserted(() -> + assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInMsg(), 100) + ); + Awaitility.await().untilAsserted(() -> { + Optional<DispatchRateLimiter> limiterOpt = subscription.getDispatcher().getRateLimiter(); + assertTrue(limiterOpt.isPresent()); + assertEquals(limiterOpt.get().getDispatchRateOnMsg(), 100); + }); + } + + @Test + public void testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel() throws Exception { + final String topicName = "persistent://" + newTopicName(); + final String subscribeName = "testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscribeName) + .subscribe(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + assertNotNull(topic); + + PersistentSubscription subscription = topic.getSubscriptions().get(subscribeName); + assertNotNull(subscription); + assertFalse(subscription.getDispatcher().getRateLimiter().isPresent()); + + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInByte", "1000"); + Awaitility.await().untilAsserted(() -> + assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInByte(), 1000) + ); + Awaitility.await().untilAsserted(() -> { + Optional<DispatchRateLimiter> limiterOpt = subscription.getDispatcher().getRateLimiter(); + assertTrue(limiterOpt.isPresent()); + assertEquals(limiterOpt.get().getDispatchRateOnByte(), 1000); + }); + } +}
