lhotari commented on PR #23930:
URL: https://github.com/apache/pulsar/pull/23930#issuecomment-2640930867
> Although clock sources aren't necessarily monotonic in all platforms, it
seems that the root cause of this issue is could be different. The
DefaultMonotonicSnapshotClock was originally added to address a performance
bottleneck in calling `System.nanoTime()` on MacOS. This wasn't a significant
bottleneck on Linux platform. The solution was to update the value in a single
thread and add eventual consistency to the rate limiter. The problem seen in
issue #23920 could occur when this thread gets starved and the updates are
delayed. This PR already contains 2 mitigations to this problem by modifying
the thread priority to `MAX_PRIORITY` and by modifying the logic how new tokens
are added to the bucket when the token update happens. However, I'll need to
find a solution where the problem would be properly avoided without adding
contention since contention would cause the rate limiting solution to have a
relatively high overhead.
I found a solution to the problem. It's possible to detect leaps backwards,
but forward leaps should be unhandled since the tick updater thread could
starve and time should be able to leap forward in that case when it finally
gets updated.
Time leaping forward would cause issues with the changes in this PR. Before
the changes made in this PR, there was an issue which caused negative tokens.
I'll continue adding more tests to describe the scenario.
My assumption currently is that the minimal fix for #23920 is this diff:
```patch
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e59..36edb1c5d8a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -159,16 +159,15 @@ public abstract class AsyncTokenBucket {
if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens))
{
// calculate the number of new tokens since the last update
long newTokens =
calculateNewTokensSinceLastUpdate(currentNanos);
- // calculate the total amount of tokens to consume in this
update
// flush the pendingConsumedTokens by calling "sumThenReset"
- long totalConsumedTokens = consumeTokens +
pendingConsumedTokens.sumThenReset();
+ long currentPendingConsumedTokens =
pendingConsumedTokens.sumThenReset();
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
- currentTokens ->
- // after adding new tokens, limit the tokens to
the capacity
- Math.min(currentTokens + newTokens,
getCapacity())
- // subtract the consumed tokens
- - totalConsumedTokens);
+ // after adding new tokens subtract the pending
consumed tokens and
+ // limit the tokens to the capacity of the bucket
+ currentTokens -> Math.min(currentTokens + newTokens -
currentPendingConsumedTokens, getCapacity())
+ // subtract the consumed tokens
+ - consumeTokens);
} else {
// eventual consistent fast path, tokens are not updated
immediately
```
The problem with the previous solution was that it would first add new
tokens and cap it to the capacity. With the eventually consistency logic in
AsyncTokenBucket, that's a wrong solution. I'll provide a separate explanation
for this in later updates.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]