anvinjain commented on a change in pull request #11135:
URL: https://github.com/apache/pulsar/pull/11135#discussion_r660445457



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -234,6 +238,19 @@ public void resetCloseFuture() {
         // noop
     }
 
+    protected static Pair<Integer, Integer> calculateToRead(int 
messagesToRead, int availablePermitsOnMsg,

Review comment:
       Maybe a better name like `computeReadLimits` ?
   Can parameters be ordered as: `messagesToRead, availableMsgPermits, 
bytesToRead, availableBytePermits` just to have a consistent grouping

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -310,13 +312,15 @@ protected int calculateNumOfMessageToRead(int 
currentTotalAvailablePermits) {
                     }
                     topic.getBrokerService().executor().schedule(() -> 
readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
                             TimeUnit.MILLISECONDS);
-                    return -1;
+                    return new MutablePair<>(-1, -1);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according 
to available permit
-                    long availablePermitsOnMsg = 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) 
availablePermitsOnMsg);
-                    }
+                    Pair<Integer, Integer> calculateResult = 
calculateToRead(messagesToRead,
+                            (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                            (int) 
topicRateLimiter.getAvailableDispatchRateLimitOnByte(), bytesToRead);

Review comment:
       Should we do type conversion for bytes from long to int? Could cause 
bugs when user sets dispatch rate  higher than 2GB (unlikely but possible)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -234,6 +238,19 @@ public void resetCloseFuture() {
         // noop
     }
 
+    protected static Pair<Integer, Integer> calculateToRead(int 
messagesToRead, int availablePermitsOnMsg,
+                                                     int 
availablePermitsOnByte, int bytesToRead) {
+        if (availablePermitsOnMsg > 0) {
+            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
+        }
+
+        if (availablePermitsOnByte > 0) {
+            bytesToRead = availablePermitsOnByte;

Review comment:
       `dispatcherMaxReadSizeBytes` was used to limit max number of bytes that 
can be read from bookies in one cursor read op. With this change if user 
configures a dispatch rate of say 100MB/s, it will mean that bytesToRead can be 
100MB (since we do not do Math.min here)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -263,8 +264,7 @@ public synchronized void readMoreEntries() {
                             consumerList.size());
                 }
                 havePendingRead = true;
-                cursor.asyncReadEntriesOrWait(messagesToRead, 
serviceConfig.getDispatcherMaxReadSizeBytes(),
-                        this,
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,

Review comment:
       bytesToRead can potentially be -1 now (like at line 315 when `new 
MutablePair<>(-1, -1);` is returned) when dispatch limit is not set. This 
changes the default behaviour because earlier it was capped by 
`serviceConfig.getDispatcherMaxReadSizeBytes()` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to