Copilot commented on code in PR #25061:
URL: https://github.com/apache/pulsar/pull/25061#discussion_r2609000769


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +349,23 @@ private void 
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
         }
     }
 
-    protected void acquirePermitsForDeliveredMessages(Topic topic, 
ManagedCursor cursor, long totalEntries,
-                                                      long totalMessagesSent, 
long totalBytesSent) {
+    private boolean tryAcquirePermitsForDeliveredMessages(
+            Topic topic, ManagedCursor cursor, long totalMessagesSent, long 
totalBytesSent) {

Review Comment:
   Similar to the previous parameter, `totalBytesSent` is misleading since this 
method is now called per-entry rather than once after processing all entries. 
Consider renaming to `byteCount` or `numBytes` to better reflect that it 
represents the count for a single entry, not a cumulative total.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterClassicImpl.java:
##########
@@ -74,14 +74,26 @@ public long getAvailableDispatchRateLimitOnByte() {
      */
     @Override
     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+        tryConsumeDispatchQuota(numberOfMessages, byteSize);
+    }
+
+    /**
+     * It acquires msg and bytes permits from rate-limiter and returns if 
acquired permits succeed.
+     * @param numberOfMessages
+     * @param byteSize
+     */
+    @Override
+    public boolean tryConsumeDispatchQuota(long numberOfMessages, long 
byteSize) {
+        boolean res = true;
         RateLimiter localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
         if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) 
{
-            localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
+            res &= 
localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
         }
         RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
         if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
-            localDispatchRateLimiterOnByte.tryAcquire(byteSize);
+            res &= localDispatchRateLimiterOnByte.tryAcquire(byteSize);
         }
+        return res;

Review Comment:
   The method consumes tokens from multiple limiters sequentially, which means 
if the message limiter succeeds but the byte limiter fails, tokens have already 
been consumed from the message limiter. This creates a conservative behavior 
where some quota is consumed without dispatching messages. Consider documenting 
this behavior in the method's Javadoc, or alternatively, check all limiters' 
availability before consuming any tokens to avoid unnecessary quota consumption.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +349,23 @@ private void 
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
         }
     }
 
-    protected void acquirePermitsForDeliveredMessages(Topic topic, 
ManagedCursor cursor, long totalEntries,
-                                                      long totalMessagesSent, 
long totalBytesSent) {
+    private boolean tryAcquirePermitsForDeliveredMessages(
+            Topic topic, ManagedCursor cursor, long totalMessagesSent, long 
totalBytesSent) {

Review Comment:
   The parameter name `totalMessagesSent` is misleading since this method is 
now called per-entry rather than once after processing all entries. Consider 
renaming to `messageCount` or `numMessages` to better reflect that it 
represents the count for a single entry, not a cumulative total.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java:
##########
@@ -84,6 +84,14 @@ protected DispatchRateLimiter(BrokerService brokerService) {
      */
     public abstract void consumeDispatchQuota(long numberOfMessages, long 
byteSize);
 
+    /**
+     * It tries to acquire msg and bytes permits from rate-limiter and returns 
if acquired permits succeed.
+     *
+     * @param numberOfMessages
+     * @param byteSize
+     */
+    public abstract boolean tryConsumeDispatchQuota(long numberOfMessages, 
long byteSize);

Review Comment:
   The Javadoc is incomplete and missing parameter descriptions. The @param 
tags should have descriptions for what `numberOfMessages` and `byteSize` 
represent.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterAsyncTokenBucketImpl.java:
##########
@@ -73,14 +73,21 @@ public long getAvailableDispatchRateLimitOnByte() {
      */
     @Override
     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+        tryConsumeDispatchQuota(numberOfMessages, byteSize);
+    }
+
+    @Override
+    public boolean tryConsumeDispatchQuota(long numberOfMessages, long 
byteSize) {
+        boolean res = true;
         AsyncTokenBucket localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
         if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) 
{
-            localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
+            res &= 
localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages);
         }
         AsyncTokenBucket localDispatchRateLimiterOnByte = 
dispatchRateLimiterOnByte;
         if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
-            localDispatchRateLimiterOnByte.consumeTokens(byteSize);
+            res &= 
localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize);
         }
+        return res;
     }

Review Comment:
   The method consumes tokens from multiple limiters sequentially, which means 
if the message limiter succeeds but the byte limiter fails, tokens have already 
been consumed from the message limiter. This creates a conservative behavior 
where some quota is consumed without dispatching messages. Consider documenting 
this behavior in the method's Javadoc, or alternatively, check all limiters' 
availability before consuming any tokens to avoid unnecessary quota consumption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to