Copilot commented on code in PR #25061:
URL: https://github.com/apache/pulsar/pull/25061#discussion_r2607298726
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java:
##########
@@ -84,6 +84,14 @@ protected DispatchRateLimiter(BrokerService brokerService) {
*/
public abstract void consumeDispatchQuota(long numberOfMessages, long
byteSize);
+ /**
+ * It tries to acquire msg and bytes permits from rate-limiter and returns
if acquired permits succeed.
+ *
+ * @param numberOfMessages
+ * @param byteSize
Review Comment:
The Javadoc is missing the `@return` tag to document the return value.
Suggested addition:
```java
/**
* It tries to acquire msg and bytes permits from rate-limiter and returns
if acquired permits succeed.
*
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise
*/
```
```suggestion
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -160,30 +159,34 @@ public int filterEntriesForConsumer(@Nullable
MessageMetadata[] metadataArray, i
this.filterProcessedMsgs.add(entryMsgCnt);
}
+ boolean filtered = false;
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry,
msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
- entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No
Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
- filteredEntryCount++;
- filteredMessageCount += entryMsgCnt;
- filteredBytesCount += metadataAndPayload.readableBytes();
- entry.release();
- continue;
+ filtered = true;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add(entry.getPosition());
- entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No
Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
- filteredEntryCount++;
- filteredMessageCount += entryMsgCnt;
- filteredBytesCount += metadataAndPayload.readableBytes();
+ filtered = true;
+ }
+
+ if (filtered) {
+ if
(serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+ if
(!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(), cursor,
entryMsgCnt,
+ metadataAndPayload.readableBytes())) {
Review Comment:
Critical bug: When rate limiting fails for a filtered entry, the code breaks
without releasing/nulling the entry. This causes the entry's position to be
added twice to `entriesToRedeliver` (or to both `entriesToFiltered` and
`entriesToRedeliver`). The position was already added at line 165 or 171, then
gets added again at line 313 when processing unhandled entries.
The fix is to release and null the entry before breaking, similar to how
it's done for non-filtered entries:
```java
if (filtered) {
entries.set(i, null);
entry.release();
if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
if (!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(),
cursor, entryMsgCnt,
metadataAndPayload.readableBytes())) {
break; // do not process further entries
}
}
continue;
}
```
```suggestion
metadataAndPayload.readableBytes())) {
entries.set(i, null);
entry.release();
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterAsyncTokenBucketImpl.java:
##########
@@ -73,14 +73,21 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+ tryConsumeDispatchQuota(numberOfMessages, byteSize);
+ }
+
Review Comment:
The Javadoc is missing a description and the `@return` tag. The method
should document its return value.
Suggested addition:
```java
/**
* It tries to acquire msg and bytes permits from rate-limiter and returns
if acquired permits succeed.
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise
*/
```
```suggestion
/**
* Tries to acquire message and byte permits from the rate-limiter and
returns whether the acquisition succeeded.
*
* @param numberOfMessages the number of message permits to acquire
* @param byteSize the number of byte permits to acquire
* @return true if permits were successfully acquired, false otherwise
*/
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +348,18 @@ private void
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
}
}
- protected void acquirePermitsForDeliveredMessages(Topic topic,
ManagedCursor cursor, long totalEntries,
- long totalMessagesSent,
long totalBytesSent) {
+ private boolean tryAcquirePermitsForDeliveredMessages(
+ Topic topic, ManagedCursor cursor, long totalMessagesSent, long
totalBytesSent) {
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
|| (cursor != null && !cursor.isActive())) {
- long permits = dispatchThrottlingOnBatchMessageEnabled ?
totalEntries : totalMessagesSent;
- topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
- rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
- topic.getDispatchRateLimiter().ifPresent(rateLimter ->
- rateLimter.consumeDispatchQuota(permits, totalBytesSent));
- getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
+ long permits = dispatchThrottlingOnBatchMessageEnabled ? 1 :
totalMessagesSent;
+ return topic.getBrokerDispatchRateLimiter().map(l ->
l.tryConsumeDispatchQuota(permits, totalBytesSent))
+ .orElse(true)
+ && topic.getDispatchRateLimiter().map(l ->
l.tryConsumeDispatchQuota(permits, totalBytesSent))
+ .orElse(true)
+ && getRateLimiter().map(l ->
l.tryConsumeDispatchQuota(permits, totalBytesSent)).orElse(true);
Review Comment:
Critical race condition: When multiple rate limiters are present, tokens can
be consumed from earlier limiters even if later limiters fail. For example, if
the broker rate limiter succeeds (consuming tokens) but the topic rate limiter
fails, the method returns false and the message isn't dispatched, but the
broker limiter tokens are already consumed. This causes incorrect rate limiting
and defeats the purpose of this PR.
This needs a two-phase approach:
1. First check all limiters without consuming
2. Only consume from all if all checks pass
Or use a rollback mechanism to restore consumed tokens if any limiter fails.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterClassicImpl.java:
##########
@@ -74,14 +74,26 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+ tryConsumeDispatchQuota(numberOfMessages, byteSize);
+ }
+
+ /**
+ * It acquires msg and bytes permits from rate-limiter and returns if
acquired permits succeed.
+ * @param numberOfMessages
+ * @param byteSize
Review Comment:
The Javadoc is missing the `@return` tag to document the return value. The
method returns a boolean indicating whether the quota was successfully acquired.
Suggested addition:
```java
/**
* It tries to acquire msg and bytes permits from rate-limiter and returns
if acquired permits succeed.
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise
*/
```
```suggestion
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]