Copilot commented on code in PR #25061:
URL: https://github.com/apache/pulsar/pull/25061#discussion_r2609000769
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +349,23 @@ private void
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
}
}
- protected void acquirePermitsForDeliveredMessages(Topic topic,
ManagedCursor cursor, long totalEntries,
- long totalMessagesSent,
long totalBytesSent) {
+ private boolean tryAcquirePermitsForDeliveredMessages(
+ Topic topic, ManagedCursor cursor, long totalMessagesSent, long
totalBytesSent) {
Review Comment:
Similar to the previous parameter, `totalBytesSent` is misleading since this
method is now called per-entry rather than once after processing all entries.
Consider renaming to `byteCount` or `numBytes` to better reflect that it
represents the count for a single entry, not a cumulative total.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterClassicImpl.java:
##########
@@ -74,14 +74,26 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+ tryConsumeDispatchQuota(numberOfMessages, byteSize);
+ }
+
+ /**
+ * It acquires msg and bytes permits from rate-limiter and returns if
acquired permits succeed.
+ * @param numberOfMessages
+ * @param byteSize
+ */
+ @Override
+ public boolean tryConsumeDispatchQuota(long numberOfMessages, long
byteSize) {
+ boolean res = true;
RateLimiter localDispatchRateLimiterOnMessage =
dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null)
{
- localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
+ res &=
localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
}
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
- localDispatchRateLimiterOnByte.tryAcquire(byteSize);
+ res &= localDispatchRateLimiterOnByte.tryAcquire(byteSize);
}
+ return res;
Review Comment:
The method consumes tokens from multiple limiters sequentially, which means
if the message limiter succeeds but the byte limiter fails, tokens have already
been consumed from the message limiter. This creates a conservative behavior
where some quota is consumed without dispatching messages. Consider documenting
this behavior in the method's Javadoc, or alternatively, check all limiters'
availability before consuming any tokens to avoid unnecessary quota consumption.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -332,17 +349,23 @@ private void
individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
}
}
- protected void acquirePermitsForDeliveredMessages(Topic topic,
ManagedCursor cursor, long totalEntries,
- long totalMessagesSent,
long totalBytesSent) {
+ private boolean tryAcquirePermitsForDeliveredMessages(
+ Topic topic, ManagedCursor cursor, long totalMessagesSent, long
totalBytesSent) {
Review Comment:
The parameter name `totalMessagesSent` is misleading since this method is
now called per-entry rather than once after processing all entries. Consider
renaming to `messageCount` or `numMessages` to better reflect that it
represents the count for a single entry, not a cumulative total.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java:
##########
@@ -84,6 +84,14 @@ protected DispatchRateLimiter(BrokerService brokerService) {
*/
public abstract void consumeDispatchQuota(long numberOfMessages, long
byteSize);
+ /**
+ * It tries to acquire msg and bytes permits from rate-limiter and returns
if acquired permits succeed.
+ *
+ * @param numberOfMessages
+ * @param byteSize
+ */
+ public abstract boolean tryConsumeDispatchQuota(long numberOfMessages,
long byteSize);
Review Comment:
The Javadoc is incomplete and missing parameter descriptions. The @param
tags should have descriptions for what `numberOfMessages` and `byteSize`
represent.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterAsyncTokenBucketImpl.java:
##########
@@ -73,14 +73,21 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+ tryConsumeDispatchQuota(numberOfMessages, byteSize);
+ }
+
+ @Override
+ public boolean tryConsumeDispatchQuota(long numberOfMessages, long
byteSize) {
+ boolean res = true;
AsyncTokenBucket localDispatchRateLimiterOnMessage =
dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null)
{
- localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
+ res &=
localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages);
}
AsyncTokenBucket localDispatchRateLimiterOnByte =
dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
- localDispatchRateLimiterOnByte.consumeTokens(byteSize);
+ res &=
localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize);
}
+ return res;
}
Review Comment:
The method consumes tokens from multiple limiters sequentially, which means
if the message limiter succeeds but the byte limiter fails, tokens have already
been consumed from the message limiter. This creates a conservative behavior
where some quota is consumed without dispatching messages. Consider documenting
this behavior in the method's Javadoc, or alternatively, check all limiters'
availability before consuming any tokens to avoid unnecessary quota consumption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]