This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed68ec106fe [fix][broker] Fix closing of 
AbstractDispatcherSingleActiveConsumer and reduce flakiness of test (#21736)
ed68ec106fe is described below

commit ed68ec106fed34ea39fc52810e270094b35e2ca3
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 19 16:13:14 2023 +0200

    [fix][broker] Fix closing of AbstractDispatcherSingleActiveConsumer and 
reduce flakiness of test (#21736)
---
 .../AbstractDispatcherSingleActiveConsumer.java    |  2 ++
 .../apache/pulsar/broker/service/Dispatcher.java   |  4 +++-
 .../NonPersistentDispatcherMultipleConsumers.java  |  2 ++
 .../PersistentDispatcherMultipleConsumers.java     |  7 ------
 .../PersistentDispatcherSingleActiveConsumer.java  | 15 ------------
 .../SubscriptionMessageDispatchThrottlingTest.java | 28 ++++++++++++----------
 6 files changed, 23 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5e74158c9c2..4a8a805f16b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.client.impl.Murmur3Hash32;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -261,6 +262,7 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
     public CompletableFuture<Void> close(boolean disconnectConsumers,
                                          Optional<BrokerLookupData> 
assignedBrokerLookupData) {
         IS_CLOSED_UPDATER.set(this, TRUE);
+        getRateLimiter().ifPresent(DispatchRateLimiter::close);
         return disconnectConsumers
                 ? disconnectAllConsumers(false, assignedBrokerLookupData) : 
CompletableFuture.completedFuture(null);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index bdea106171b..08e5caaa2dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.service;
 
 import java.util.List;
@@ -101,7 +102,8 @@ public interface Dispatcher {
     }
 
     default void updateRateLimiter() {
-        //No-op
+        initializeDispatchRateLimiterIfNeeded();
+        getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
     }
 
     default boolean initializeDispatchRateLimiterIfNeeded() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 0a9ec9fbb57..29bca715741 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
@@ -131,6 +132,7 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
     public CompletableFuture<Void> close(boolean disconnectConsumers,
                                          Optional<BrokerLookupData> 
assignedBrokerLookupData) {
         IS_CLOSED_UPDATER.set(this, TRUE);
+        getRateLimiter().ifPresent(DispatchRateLimiter::close);
         return disconnectConsumers
                 ? disconnectAllConsumers(false, assignedBrokerLookupData) : 
CompletableFuture.completedFuture(null);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6001561d72a..cafc398a3c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1026,13 +1026,6 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return dispatchRateLimiter;
     }
 
-    @Override
-    public void updateRateLimiter() {
-        if (!initializeDispatchRateLimiterIfNeeded()) {
-            
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
-        }
-    }
-
     @Override
     public boolean initializeDispatchRateLimiterIfNeeded() {
         if (!dispatchRateLimiter.isPresent() && 
DispatchRateLimiter.isDispatchRateEnabled(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 54c039d632f..806773af451 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -560,13 +559,6 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         return dispatchRateLimiter;
     }
 
-    @Override
-    public void updateRateLimiter() {
-        if (!initializeDispatchRateLimiterIfNeeded()) {
-            
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
-        }
-    }
-
     @Override
     public boolean initializeDispatchRateLimiterIfNeeded() {
         if (!dispatchRateLimiter.isPresent() && 
DispatchRateLimiter.isDispatchRateEnabled(
@@ -578,13 +570,6 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         return false;
     }
 
-    @Override
-    public CompletableFuture<Void> close() {
-        IS_CLOSED_UPDATER.set(this, TRUE);
-        dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-        return disconnectAllConsumers();
-    }
-
     @Override
     public boolean checkAndUnblockIfStuck() {
         Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 6304ed82d4f..02de11a2bcc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.api;
 
 import static org.awaitility.Awaitility.await;
 import com.google.common.collect.Sets;
-import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +47,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * @param subscription
      * @throws Exception
      */
-    @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 5000)
+    @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 30000)
     public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType 
subscription,
                                                              DispatchRateType 
dispatchRateType) throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -144,7 +143,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * @param subscription
      * @throws Exception
      */
-    @Test(dataProvider = "subscriptions", timeOut = 5000)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     public void 
testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription)
         throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -218,7 +217,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 
15)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     private void testMessageNotDuplicated(SubscriptionType subscription) 
throws Exception {
         int brokerRate = 1000;
         int topicRate = 5000;
@@ -273,7 +272,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
         final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
-        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+        Awaitility.await().untilAsserted(() -> {
             DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
             Assert.assertTrue(brokerDispatchRateLimiter != null
                     && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -320,7 +319,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * @param subscription
      * @throws Exception
      */
-    @Test(dataProvider = "subscriptions", timeOut = 5000)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -451,7 +450,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
         final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
-        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+        Awaitility.await().untilAsserted(() -> {
             DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
             Assert.assertTrue(brokerDispatchRateLimiter != null
                     && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -526,7 +525,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * @param subscription
      * @throws Exception
      */
-    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     public void 
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -539,6 +538,11 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         long initBytes = 
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         final int byteRate = 1000;
         
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
byteRate);
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(pulsar.getConfiguration().getDispatchThrottlingRateInByte(),
 byteRate);
+        });
+
         admin.namespaces().createNamespace(namespace1, 
Sets.newHashSet("test"));
         admin.namespaces().createNamespace(namespace2, 
Sets.newHashSet("test"));
 
@@ -572,7 +576,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
 
-        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+        Awaitility.await().untilAsserted(() -> {
             DispatchRateLimiter rateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
             Assert.assertTrue(rateLimiter != null
                     && rateLimiter.getDispatchRateOnByte() > 0);
@@ -605,7 +609,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      *
      * @throws Exception
      */
-    @Test(timeOut = 5000)
+    @Test(timeOut = 30000)
     public void testRateLimitingMultipleConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -691,7 +695,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
     }
 
 
-    @Test(dataProvider = "subscriptions", timeOut = 5000)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     public void testClusterRateLimitingConfiguration(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -868,7 +872,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test(dataProvider = "subscriptions", timeOut = 11000)
+    @Test(dataProvider = "subscriptions", timeOut = 30000)
     public void testClosingRateLimiter(SubscriptionType subscription) throws 
Exception {
         log.info("-- Starting {} test --", methodName);
 

Reply via email to