This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0c6e809  [Branch-2.7] Fix broker dispatch byte rate limiter (#11249)
0c6e809 is described below

commit 0c6e809c1ba0d667dd1a0ef87f49eb5a38b631f6
Author: ran <[email protected]>
AuthorDate: Fri Jul 9 10:26:34 2021 +0800

    [Branch-2.7] Fix broker dispatch byte rate limiter (#11249)
    
    The PR #11135 couldn't be cherry-picked to branch-2.7, because there are 
too many conflicts.
    
    ## Motivation
    
    fix https://github.com/apache/pulsar/issues/11044
    now dispatcher byte rate limit don't limit every cursor read. When cursor 
read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It 
will cause that  dispatcher read entries by 
`ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
    
    ## implement
    
    when cursor read entries size need to calculate, the calculate result by 
dispatcher bytes limiter.
---
 .../broker/service/AbstractBaseDispatcher.java     | 20 ++++++++++-
 .../AbstractDispatcherMultipleConsumers.java       |  5 +--
 .../AbstractDispatcherSingleActiveConsumer.java    |  5 +--
 .../NonPersistentDispatcherMultipleConsumers.java  |  5 +--
 ...onPersistentDispatcherSingleActiveConsumer.java |  6 ++--
 .../service/persistent/DispatchRateLimiter.java    |  9 +++++
 .../PersistentDispatcherMultipleConsumers.java     | 32 ++++++++---------
 .../PersistentDispatcherSingleActiveConsumer.java  | 34 +++++++++---------
 .../client/api/MessageDispatchThrottlingTest.java  | 41 ++++++++--------------
 .../SubscriptionMessageDispatchThrottlingTest.java |  8 ++---
 10 files changed, 89 insertions(+), 76 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 997ce5c..627e4e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -48,8 +49,11 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
 
     protected final Subscription subscription;
 
-    protected AbstractBaseDispatcher(Subscription subscription) {
+    protected final ServiceConfiguration serviceConfig;
+
+    protected AbstractBaseDispatcher(Subscription subscription, 
ServiceConfiguration serviceConfig) {
         this.subscription = subscription;
+        this.serviceConfig = serviceConfig;
     }
 
     /**
@@ -235,4 +239,18 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
             }
         });
     }
+
+    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, 
int availablePermitsOnMsg,
+                                                           long bytesToRead, 
long availablePermitsOnByte) {
+        if (availablePermitsOnMsg > 0) {
+            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
+        }
+
+        if (availablePermitsOnByte > 0) {
+            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
+        }
+
+        return Pair.of(messagesToRead, bytesToRead);
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 434ff2d..44162d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -26,6 +26,7 @@ import java.util.Random;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.slf4j.Logger;
@@ -47,8 +48,8 @@ public abstract class AbstractDispatcherMultipleConsumers 
extends AbstractBaseDi
 
     private Random random = new Random(42);
 
-    protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
-        super(subscription);
+    protected AbstractDispatcherMultipleConsumers(Subscription subscription, 
ServiceConfiguration serviceConfig) {
+        super(subscription, serviceConfig);
     }
 
     public boolean isConsumerConnected() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 559ba90..a321b64 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -56,8 +57,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
     private volatile int isClosed = FALSE;
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, 
int partitionIndex,
-            String topicName, Subscription subscription) {
-        super(subscription);
+            String topicName, Subscription subscription, ServiceConfiguration 
serviceConfig) {
+        super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2943636..3bf1f9d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
     @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, 
Subscription subscription) {
-        super(subscription);
+        super(subscription, 
topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 1b51e0d..69e9a95 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -25,7 +25,6 @@ import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import java.util.List;
 
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
@@ -47,16 +46,15 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     private final NonPersistentTopic topic;
     private final Rate msgDrop;
     private final Subscription subscription;
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 2661727..b2447a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -407,6 +407,15 @@ public class DispatchRateLimiter {
                 || dispatchRate.dispatchThrottlingRateInByte > 0);
     }
 
+    /**
+     * returns available byte-permit if msg-dispatch-throttling is enabled 
else it returns -1.
+     *
+     * @return
+     */
+    public long getAvailableDispatchRateLimitOnByte() {
+        return dispatchRateLimiterOnByte == null ? -1 : 
dispatchRateLimiterOnByte.getAvailablePermits();
+    }
+
     public void close() {
         // close rate-limiter
         if (dispatchRateLimiterOnMessage != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 3f61156..6b2b4ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,7 +43,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -103,7 +103,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> 
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
 "blockedDispatcherOnUnackedMsgs");
-    protected final ServiceConfiguration serviceConfig;
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
     enum ReadType {
@@ -111,8 +110,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, Subscription subscription) {
-        super(subscription);
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
+        super(subscription, 
topic.getBrokerService().pulsar().getConfiguration());
         this.cursor = cursor;
         this.lastIndividualDeletedRangeFromCursorRecovery = 
cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
@@ -250,6 +248,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
             int messagesToRead = Math.min(currentTotalAvailablePermits, 
readBatchSize);
+            long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
 
             Consumer c = getRandomConsumer();
             // if turn on precise dispatcher flow control, adjust the record 
to read
@@ -281,11 +280,12 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                                 TimeUnit.MILLISECONDS);
                         return;
                     } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
+                        Pair<Integer, Long> calculateResult = 
computeReadLimits(
+                                messagesToRead, (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                                bytesToRead, 
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                        messagesToRead = calculateResult.getLeft();
+                        bytesToRead = calculateResult.getRight();
                     }
                 }
 
@@ -300,11 +300,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                             TimeUnit.MILLISECONDS);
                         return;
                     } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
+                        Pair<Integer, Long> calculateResult = 
computeReadLimits(
+                                messagesToRead, (int) 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                                bytesToRead, 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+                        messagesToRead = calculateResult.getLeft();
+                        bytesToRead = calculateResult.getRight();
                     }
                 }
 
@@ -319,6 +319,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
             // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
             messagesToRead = Math.max(messagesToRead, 1);
+            bytesToRead = Math.max(bytesToRead, 1);
             Set<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
 
             if (!messagesToReplayNow.isEmpty()) {
@@ -351,8 +352,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                cursor.asyncReadEntriesOrWait(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(), this,
-                        ReadType.Normal);
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this, ReadType.Normal);
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 2930a33..4be78e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
@@ -74,7 +74,6 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
 
     private volatile int readBatchSize;
     private final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-    private final ServiceConfiguration serviceConfig;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     LongPairSet messagesToRedeliver;
@@ -83,12 +82,12 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
 
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, 
SubType subscriptionType, int partitionIndex,
             PersistentTopic topic, Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -383,6 +382,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
             }
 
             int messagesToRead = Math.min(availablePermits, readBatchSize);
+            long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
             // if turn of precise dispatcher flow control, adjust the records 
to read
             if (consumer.isPreciseDispatcherFlowControl()) {
                 int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
@@ -415,11 +415,12 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                         }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
                         return;
                     } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                        if (availablePermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                        }
+                        Pair<Integer, Long> calculateResult = 
computeReadLimits(
+                                messagesToRead, (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                                bytesToRead, 
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                        messagesToRead = calculateResult.getLeft();
+                        bytesToRead = calculateResult.getRight();
                     }
                 }
 
@@ -443,18 +444,19 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                         }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
                         return;
                     } else {
-                        // if dispatch-rate is in msg then read only msg 
according to available permit
-                        long subPermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                        if (subPermitsOnMsg > 0) {
-                            messagesToRead = Math.min(messagesToRead, (int) 
subPermitsOnMsg);
-                        }
+                        Pair<Integer, Long> calculateResult = 
computeReadLimits(
+                                messagesToRead, (int) 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                                bytesToRead, 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+                        messagesToRead = calculateResult.getLeft();
+                        bytesToRead = calculateResult.getRight();
                     }
                 }
             }
 
             // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
             messagesToRead = Math.max(messagesToRead, 1);
-
+            bytesToRead = Math.max(bytesToRead, 1);
             // Schedule read
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
@@ -479,7 +481,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
             } else if (consumer.readCompacted()) {
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
             } else {
-                cursor.asyncReadEntriesOrWait(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer);
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this, consumer);
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 1383bc3..ffdd8f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -390,56 +391,42 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
      */
     @Test(dataProvider = "subscriptions", timeOut = 5000)
     public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
         log.info("-- Starting {} test --", methodName);
 
         final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + 
"/throttlingAll";
+        final String subscriptionName = "my-subscriber-name";
 
-        final int byteRate = 100;
+        final int byteRate = 250;
         DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
         // create producer and topic
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-        boolean isMessageRateUpdate = false;
-        int retry = 5;
-        for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 
0) {
-                isMessageRateUpdate = true;
-                break;
-            } else {
-                if (i != retry - 1) {
-                    Thread.sleep(100);
-                }
-            }
-        }
-        Assert.assertTrue(isMessageRateUpdate);
+        Awaitility.await().until(() -> 
topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), 
dispatchRate);
 
         final int numProducedMessages = 20;
-        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
 
         final AtomicInteger totalReceived = new AtomicInteger(0);
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[99]);
+        }
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscriptionType(subscription).messageListener((c1, msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
                     String receivedMessage = new String(msg.getData());
                     log.debug("Received message [{}] in the listener", 
receivedMessage);
                     totalReceived.incrementAndGet();
-                    latch.countDown();
                 }).subscribe();
-        // deactive cursors
-        deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
 
-        // Asynchronously produce messages
-        for (int i = 0; i < numProducedMessages; i++) {
-            producer.send(new byte[byteRate / 10]);
-        }
-
-        latch.await();
-        Assert.assertEquals(totalReceived.get(), numProducedMessages);
+        Awaitility.await().atLeast(3, TimeUnit.SECONDS)
+                .atMost(5, TimeUnit.SECONDS).until(() -> totalReceived.get() > 
6 && totalReceived.get() < 10);
 
         consumer.close();
         producer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7ed88f5..f678959 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -210,8 +210,8 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * verify rate-limiting should throttle message-dispatching based on 
byte-rate
      *
      * <pre>
-     *  1. dispatch-byte-rate = 100 bytes/sec
-     *  2. send 30 msgs : each with 10 byte
+     *  1. dispatch-byte-rate = 1500 bytes/sec
+     *  2. send 30 msgs : each with 150 byte
      *  3. it should take up to 2 second to receive all messages
      * </pre>
      *
@@ -226,7 +226,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         final String topicName = "persistent://" + namespace + 
"/throttlingAll-" + System.nanoTime();
         final String subName = "my-subscriber-name-" + subscription;
 
-        final int byteRate = 100;
+        final int byteRate = 1500;
         DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, 
dispatchRate);
@@ -241,7 +241,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
             .subscriptionType(subscription).messageListener((c1, msg) -> {
                 Assert.assertNotNull(msg, "Message cannot be null");
                 String receivedMessage = new String(msg.getData());
-                log.debug("Received message [{}] in the listener", 
receivedMessage);
+                log.info("Received message [{}] in the listener", 
receivedMessage);
                 totalReceived.incrementAndGet();
                 latch.countDown();
             }).subscribe();

Reply via email to