Copilot commented on code in PR #25061:
URL: https://github.com/apache/pulsar/pull/25061#discussion_r2607298726


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java:
##########
@@ -84,6 +84,14 @@ protected DispatchRateLimiter(BrokerService brokerService) {
      */
     public abstract void consumeDispatchQuota(long numberOfMessages, long 
byteSize);
 
+    /**
+     * It tries to acquire msg and bytes permits from rate-limiter and returns 
if acquired permits succeed.
+     *
+     * @param numberOfMessages
+     * @param byteSize

Review Comment:
   The Javadoc is missing the `@return` tag to document the return value.
   
   Suggested addition:
   ```java
   /**
    * It tries to acquire msg and bytes permits from rate-limiter and returns 
if acquired permits succeed.
    *
    * @param numberOfMessages the number of messages to acquire permits for
    * @param byteSize the number of bytes to acquire permits for
    * @return true if permits were successfully acquired, false otherwise
    */
   ```
   ```suggestion
        * @param numberOfMessages the number of messages to acquire permits for
        * @param byteSize the number of bytes to acquire permits for
        * @return true if permits were successfully acquired, false otherwise
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -160,30 +159,34 @@ public int filterEntriesForConsumer(@Nullable 
MessageMetadata[] metadataArray, i
                 this.filterProcessedMsgs.add(entryMsgCnt);
             }
 
+            boolean filtered = false;
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, 
msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
-                entries.set(i, null);
                 // FilterResult will be always `ACCEPTED` when there is No 
Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRejectedMsgs.add(entryMsgCnt);
-                filteredEntryCount++;
-                filteredMessageCount += entryMsgCnt;
-                filteredBytesCount += metadataAndPayload.readableBytes();
-                entry.release();
-                continue;
+                filtered = true;
             } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                 entriesToRedeliver.add(entry.getPosition());
-                entries.set(i, null);
                 // FilterResult will be always `ACCEPTED` when there is No 
Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRescheduledMsgs.add(entryMsgCnt);
-                filteredEntryCount++;
-                filteredMessageCount += entryMsgCnt;
-                filteredBytesCount += metadataAndPayload.readableBytes();
+                filtered = true;
+            }
+
+            if (filtered) {
+                if 
(serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+                    if 
(!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, 
entryMsgCnt,
+                            metadataAndPayload.readableBytes())) {

Review Comment:
   Critical bug: When rate limiting fails for a filtered entry, the code breaks 
without releasing/nulling the entry. This causes the entry's position to be 
added twice to `entriesToRedeliver` (or to both `entriesToFiltered` and 
`entriesToRedeliver`). The position was already added at line 165 or 171, then 
gets added again at line 313 when processing unhandled entries.
   
   The fix is to release and null the entry before breaking, similar to how 
it's done for non-filtered entries:
   ```java
   if (filtered) {
       entries.set(i, null);
       entry.release();
       if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
           if (!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(), 
cursor, entryMsgCnt,
                   metadataAndPayload.readableBytes())) {
               break; // do not process further entries
           }
       }
       continue;
   }
   ```
   ```suggestion
                               metadataAndPayload.readableBytes())) {
                           entries.set(i, null);
                           entry.release();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterAsyncTokenBucketImpl.java:
##########
@@ -73,14 +73,21 @@ public long getAvailableDispatchRateLimitOnByte() {
      */
     @Override
     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+        tryConsumeDispatchQuota(numberOfMessages, byteSize);
+    }
+

Review Comment:
   The Javadoc is missing a description and the `@return` tag. The method 
should document its return value.
   
   Suggested addition:
   ```java
   /**
    * It tries to acquire msg and bytes permits from rate-limiter and returns 
if acquired permits succeed.
    * @param numberOfMessages the number of messages to acquire permits for
    * @param byteSize the number of bytes to acquire permits for
    * @return true if permits were successfully acquired, false otherwise
    */
   ```
   ```suggestion
   
       /**
        * Tries to acquire message and byte permits from the rate-limiter and 
returns whether the acquisition succeeded.
        *
        * @param numberOfMessages the number of message permits to acquire
        * @param byteSize the number of byte permits to acquire
        * @return true if permits were successfully acquired, false otherwise
        */
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +348,18 @@ private void 
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
         }
     }
 
-    protected void acquirePermitsForDeliveredMessages(Topic topic, 
ManagedCursor cursor, long totalEntries,
-                                                      long totalMessagesSent, 
long totalBytesSent) {
+    private boolean tryAcquirePermitsForDeliveredMessages(
+            Topic topic, ManagedCursor cursor, long totalMessagesSent, long 
totalBytesSent) {
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
                 || (cursor != null && !cursor.isActive())) {
-            long permits = dispatchThrottlingOnBatchMessageEnabled ? 
totalEntries : totalMessagesSent;
-            topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
-                    rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
-            topic.getDispatchRateLimiter().ifPresent(rateLimter ->
-                    rateLimter.consumeDispatchQuota(permits, totalBytesSent));
-            getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
+            long permits = dispatchThrottlingOnBatchMessageEnabled ? 1 : 
totalMessagesSent;
+            return topic.getBrokerDispatchRateLimiter().map(l -> 
l.tryConsumeDispatchQuota(permits, totalBytesSent))
+                    .orElse(true)
+                    && topic.getDispatchRateLimiter().map(l -> 
l.tryConsumeDispatchQuota(permits, totalBytesSent))
+                    .orElse(true)
+                    && getRateLimiter().map(l -> 
l.tryConsumeDispatchQuota(permits, totalBytesSent)).orElse(true);

Review Comment:
   Critical race condition: When multiple rate limiters are present, tokens can 
be consumed from earlier limiters even if later limiters fail. For example, if 
the broker rate limiter succeeds (consuming tokens) but the topic rate limiter 
fails, the method returns false and the message isn't dispatched, but the 
broker limiter tokens are already consumed. This causes incorrect rate limiting 
and defeats the purpose of this PR.
   
   This needs a two-phase approach:
   1. First check all limiters without consuming
   2. Only consume from all if all checks pass
   
   Or use a rollback mechanism to restore consumed tokens if any limiter fails.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterClassicImpl.java:
##########
@@ -74,14 +74,26 @@ public long getAvailableDispatchRateLimitOnByte() {
      */
     @Override
     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+        tryConsumeDispatchQuota(numberOfMessages, byteSize);
+    }
+
+    /**
+     * It acquires msg and bytes permits from rate-limiter and returns if 
acquired permits succeed.
+     * @param numberOfMessages
+     * @param byteSize

Review Comment:
   The Javadoc is missing the `@return` tag to document the return value. The 
method returns a boolean indicating whether the quota was successfully acquired.
   
   Suggested addition:
   ```java
   /**
    * It tries to acquire msg and bytes permits from rate-limiter and returns 
if acquired permits succeed.
    * @param numberOfMessages the number of messages to acquire permits for
    * @param byteSize the number of bytes to acquire permits for
    * @return true if permits were successfully acquired, false otherwise
    */
   ```
   ```suggestion
        * @param numberOfMessages the number of messages to acquire permits for
        * @param byteSize the number of bytes to acquire permits for
        * @return true if permits were successfully acquired, false otherwise
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to