This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b27235a051f240125b986750c753d3210be1a3c
Author: congbo <[email protected]>
AuthorDate: Tue Jul 6 10:28:43 2021 +0800

    [Broker] Fix broker dispatch byte rate limiter. (#11135)
    
    ## Motivation
    fix https://github.com/apache/pulsar/issues/11044
    now dispatcher byte rate limit don't limit every cursor read. When cursor 
read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It 
will cause that  dispatcher read entries by 
`ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
    
    (cherry picked from commit ce6be124c9c86a6e10604ff44dc817f5d6f13c0e)
---
 .../broker/service/AbstractBaseDispatcher.java     | 19 +++++++-
 .../AbstractDispatcherMultipleConsumers.java       |  5 ++-
 .../AbstractDispatcherSingleActiveConsumer.java    |  6 ++-
 .../NonPersistentDispatcherMultipleConsumers.java  |  5 +--
 ...onPersistentDispatcherSingleActiveConsumer.java |  6 +--
 .../service/persistent/DispatchRateLimiter.java    |  9 ++++
 .../PersistentDispatcherMultipleConsumers.java     | 50 ++++++++++++----------
 .../PersistentDispatcherSingleActiveConsumer.java  | 49 ++++++++++++---------
 ...istentStreamingDispatcherMultipleConsumers.java | 10 +++--
 ...entStreamingDispatcherSingleActiveConsumer.java | 11 +++--
 .../streamingdispatch/StreamingEntryReader.java    |  4 +-
 .../client/api/MessageDispatchThrottlingTest.java  | 42 +++++++-----------
 12 files changed, 122 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index bb10df4..3646ae6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
 
     protected final Subscription subscription;
 
-    protected AbstractBaseDispatcher(Subscription subscription) {
+    protected final ServiceConfiguration serviceConfig;
+
+    protected AbstractBaseDispatcher(Subscription subscription, 
ServiceConfiguration serviceConfig) {
         this.subscription = subscription;
+        this.serviceConfig = serviceConfig;
     }
 
     /**
@@ -234,6 +238,19 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         // noop
     }
 
+    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, 
int availablePermitsOnMsg,
+                                                           long bytesToRead, 
long availablePermitsOnByte) {
+        if (availablePermitsOnMsg > 0) {
+            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
+        }
+
+        if (availablePermitsOnByte > 0) {
+            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
+        }
+
+        return Pair.of(messagesToRead, bytesToRead);
+    }
+
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
         return Commands.peekStickyKey(metadataAndPayload, 
subscription.getTopicName(), subscription.getName());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 2f6b9a6..ad98059 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectSet;
 import java.util.Random;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.slf4j.Logger;
@@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers 
extends AbstractBaseDi
 
     private Random random = new Random(42);
 
-    protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
-        super(subscription);
+    protected AbstractDispatcherMultipleConsumers(Subscription subscription, 
ServiceConfiguration serviceConfig) {
+        super(subscription, serviceConfig);
     }
 
     public boolean isConsumerConnected() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index b142c51..e73daaa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
     private volatile int isClosed = FALSE;
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, 
int partitionIndex,
-            String topicName, Subscription subscription) {
-        super(subscription);
+                                                  String topicName, 
Subscription subscription,
+                                                  ServiceConfiguration 
serviceConfig) {
+        super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 1a19186..5688683 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -23,7 +23,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
     @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, 
Subscription subscription) {
-        super(subscription);
+        super(subscription, 
topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index a720767..6094ab7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service.nonpersistent;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -40,16 +39,15 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     private final NonPersistentTopic topic;
     private final Rate msgDrop;
     private final Subscription subscription;
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
                                                        NonPersistentTopic 
topic, Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 8cf4427..994d274 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -76,6 +76,15 @@ public class DispatchRateLimiter {
     }
 
     /**
+     * returns available byte-permit if msg-dispatch-throttling is enabled 
else it returns -1.
+     *
+     * @return
+     */
+    public long getAvailableDispatchRateLimitOnByte() {
+        return dispatchRateLimiterOnByte == null ? -1 : 
dispatchRateLimiterOnByte.getAvailablePermits();
+    }
+
+    /**
      * It acquires msg and bytes permits from rate-limiter and returns if 
acquired permits succeed.
      *
      * @param msgPermits
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 3426355..58663f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -38,7 +38,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -105,7 +105,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "blockedDispatcherOnUnackedMsgs");
-    protected final ServiceConfiguration serviceConfig;
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
     protected enum ReadType {
@@ -114,8 +113,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
                                                  Subscription subscription) {
-        super(subscription);
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
+        super(subscription, 
topic.getBrokerService().pulsar().getConfiguration());
         this.cursor = cursor;
         this.lastIndividualDeletedRangeFromCursorRecovery = 
cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
@@ -223,9 +221,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, 
firstAvailableConsumerPermits);
         if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits 
> 0) {
-            int messagesToRead = 
calculateNumOfMessageToRead(currentTotalAvailablePermits);
+            Pair<Integer, Long> calculateResult = 
calculateToRead(currentTotalAvailablePermits);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+            if (messagesToRead == -1 || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate 
or previous pending read hasn't complete.
                 return;
             }
@@ -262,8 +262,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                cursor.asyncReadEntriesOrWait(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
-                        this,
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
@@ -275,8 +274,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
-    protected int calculateNumOfMessageToRead(int 
currentTotalAvailablePermits) {
+    // left pair is messagesToRead, right pair is bytesToRead
+    protected Pair<Integer, Long> calculateToRead(int 
currentTotalAvailablePermits) {
         int messagesToRead = Math.min(currentTotalAvailablePermits, 
readBatchSize);
+        long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
 
         Consumer c = getRandomConsumer();
         // if turn on precise dispatcher flow control, adjust the record to 
read
@@ -309,13 +310,15 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                     }
                     topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
                             TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according 
to available permit
-                    long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                    }
+                    Pair<Integer, Long> calculateResult = 
computeReadLimits(messagesToRead,
+                            (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, 
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
+
                 }
             }
 
@@ -330,13 +333,14 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                     }
                     topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
                             TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according 
to available permit
-                    long availablePermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                    }
+                    Pair<Integer, Long> calculateResult = 
computeReadLimits(messagesToRead,
+                            (int) 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
                 }
             }
 
@@ -346,11 +350,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Skipping replay while awaiting previous read 
to complete", name);
             }
-            return -1;
+            return Pair.of(-1, -1L);
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
-        return Math.max(messagesToRead, 1);
+        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
     }
 
     protected Set<? extends Position> asyncReplayEntries(Set<? extends 
Position> positions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 93c2d25..2bd94ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -34,7 +34,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -67,19 +67,18 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-    protected final ServiceConfiguration serviceConfig;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
 
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, 
SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, 
Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
-        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -324,9 +323,11 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }
 
         if (consumer.getAvailablePermits() > 0) {
-            int messagesToRead = calculateNumOfMessageToRead(consumer);
+            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate.
                 return;
             }
@@ -340,7 +341,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
             } else {
                 cursor.asyncReadEntriesOrWait(messagesToRead,
-                        serviceConfig.getDispatcherMaxReadSizeBytes(), this, 
consumer, topic.getMaxReadPosition());
+                        bytesToRead, this, consumer, 
topic.getMaxReadPosition());
             }
         } else {
             if (log.isDebugEnabled()) {
@@ -349,7 +350,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }
     }
 
-    protected int calculateNumOfMessageToRead(Consumer consumer) {
+    protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
         int availablePermits = consumer.getAvailablePermits();
         if (!consumer.isWritable()) {
             // If the connection is not currently writable, we issue the read 
request anyway, but for a single
@@ -360,6 +361,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }
 
         int messagesToRead = Math.min(availablePermits, readBatchSize);
+        long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
         // if turn of precise dispatcher flow control, adjust the records to 
read
         if (consumer.isPreciseDispatcherFlowControl()) {
             int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
@@ -391,13 +393,16 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                             }
                         }
                     }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according 
to available permit
-                    long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                    }
+
+                    Pair<Integer, Long> calculateResult = 
computeReadLimits(messagesToRead,
+                            (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, 
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
+
                 }
             }
 
@@ -421,19 +426,21 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                             }
                         }
                     }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according 
to available permit
-                    long subPermitsOnMsg = 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                    if (subPermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) 
subPermitsOnMsg);
-                    }
+
+                    Pair<Integer, Long> calculateResult = 
computeReadLimits(messagesToRead,
+                            (int) 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, 
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
                 }
             }
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
-        return Math.max(messagesToRead, 1);
+        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 666bb98..f7d47e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
@@ -139,9 +140,10 @@ public class 
PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
         // totalAvailablePermits may be updated by other threads
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
-            int messagesToRead = 
calculateNumOfMessageToRead(currentTotalAvailablePermits);
-
-            if (-1 == messagesToRead) {
+            Pair<Integer, Long> calculateResult = 
calculateToRead(currentTotalAvailablePermits);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate 
or previous pending read hasn't complete.
                 return;
             }
@@ -178,7 +180,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
                             consumerList.size());
                 }
                 havePendingRead = true;
-                streamingEntryReader.asyncReadEntries(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
+                streamingEntryReader.asyncReadEntries(messagesToRead, 
bytesToRead,
                         ReadType.Normal);
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index b4e4ed3..82e8d6d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -179,9 +180,12 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
         }
 
         if (!havePendingRead && consumer.getAvailablePermits() > 0) {
-            int messagesToRead = calculateNumOfMessageToRead(consumer);
+            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate.
                 return;
             }
@@ -195,8 +199,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             if (consumer.readCompacted()) {
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
             } else {
-                streamingEntryReader.asyncReadEntries(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
-                        consumer);
+                streamingEntryReader.asyncReadEntries(messagesToRead, 
bytesToRead, consumer);
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 51356c9..12f5600 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -68,7 +68,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     private static final AtomicReferenceFieldUpdater<StreamingEntryReader, 
State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, 
State.class, "state");
 
-    private volatile int maxReadSizeByte;
+    private volatile long maxReadSizeByte;
 
     private final Backoff readFailureBackoff = new Backoff(10, 
TimeUnit.MILLISECONDS,
             1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
@@ -81,7 +81,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
      * @param maxReadSizeByte maximum byte will be read from ledger.
      * @param ctx Context send along with read request.
      */
-    public synchronized void asyncReadEntries(int numEntriesToRead, int 
maxReadSizeByte, Object ctx) {
+    public synchronized void asyncReadEntries(int numEntriesToRead, long 
maxReadSizeByte, Object ctx) {
         if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
             internalCancelReadRequests();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index fc3cfaf..f605028 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -413,12 +414,15 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
      */
     @Test(dataProvider = "subscriptions", timeOut = 5000)
     public void 
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType 
subscription) throws Exception {
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
         log.info("-- Starting {} test --", methodName);
 
         final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + 
"/throttlingAll";
+        final String subscriptionName = "my-subscriber-name";
 
-        final int byteRate = 100;
+        //
+        final int byteRate = 250;
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(-1)
                 .dispatchThrottlingRateInByte(byteRate)
@@ -426,47 +430,31 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
                 .build();
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
         // create producer and topic
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-        boolean isMessageRateUpdate = false;
-        int retry = 5;
-        for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 
0) {
-                isMessageRateUpdate = true;
-                break;
-            } else {
-                if (i != retry - 1) {
-                    Thread.sleep(100);
-                }
-            }
-        }
-        Assert.assertTrue(isMessageRateUpdate);
+        Awaitility.await().until(() -> 
topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), 
dispatchRate);
 
         final int numProducedMessages = 20;
-        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
 
         final AtomicInteger totalReceived = new AtomicInteger(0);
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[99]);
+        }
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscriptionType(subscription).messageListener((c1, msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
                     String receivedMessage = new String(msg.getData());
                     log.debug("Received message [{}] in the listener", 
receivedMessage);
                     totalReceived.incrementAndGet();
-                    latch.countDown();
                 }).subscribe();
-        // deactive cursors
-        deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
 
-        // Asynchronously produce messages
-        for (int i = 0; i < numProducedMessages; i++) {
-            producer.send(new byte[byteRate / 10]);
-        }
-
-        latch.await();
-        Assert.assertEquals(totalReceived.get(), numProducedMessages);
+        Awaitility.await().atLeast(3, TimeUnit.SECONDS)
+                .atMost(5, TimeUnit.SECONDS).until(() -> totalReceived.get() > 
6 && totalReceived.get() < 10);
 
         consumer.close();
         producer.close();

Reply via email to