lhotari commented on PR #23930:
URL: https://github.com/apache/pulsar/pull/23930#issuecomment-2640930867

   > Although clock sources aren't necessarily monotonic in all platforms, it 
seems that the root cause of this issue is could be different. The 
DefaultMonotonicSnapshotClock was originally added to address a performance 
bottleneck in calling `System.nanoTime()` on MacOS. This wasn't a significant 
bottleneck on Linux platform. The solution was to update the value in a single 
thread and add eventual consistency to the rate limiter. The problem seen in 
issue #23920 could occur when this thread gets starved and the updates are 
delayed. This PR already contains 2 mitigations to this problem by modifying 
the thread priority to `MAX_PRIORITY` and by modifying the logic how new tokens 
are added to the bucket when the token update happens. However, I'll need to 
find a solution where the problem would be properly avoided without adding 
contention since contention would cause the rate limiting solution to have a 
relatively high overhead.
   
   I found a solution to the problem. It's possible to detect leaps backwards, 
but forward leaps should be unhandled since the tick updater thread could 
starve and time should be able to leap forward in that case when it finally 
gets updated.
   Time leaping forward would cause issues with the changes in this PR. Before 
the changes made in this PR, there was an issue which caused negative tokens. 
I'll continue adding more tests to describe the scenario.
   
   My assumption currently is that the minimal fix for #23920 is this diff:
   
   ```patch
   diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
   index ac9a1f03e59..36edb1c5d8a 100644
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
   @@ -159,16 +159,15 @@ public abstract class AsyncTokenBucket {
            if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) 
{
                // calculate the number of new tokens since the last update
                long newTokens = 
calculateNewTokensSinceLastUpdate(currentNanos);
   -            // calculate the total amount of tokens to consume in this 
update
                // flush the pendingConsumedTokens by calling "sumThenReset"
   -            long totalConsumedTokens = consumeTokens + 
pendingConsumedTokens.sumThenReset();
   +            long currentPendingConsumedTokens = 
pendingConsumedTokens.sumThenReset();
                // update the tokens and return the current token value
                return TOKENS_UPDATER.updateAndGet(this,
   -                    currentTokens ->
   -                            // after adding new tokens, limit the tokens to 
the capacity
   -                            Math.min(currentTokens + newTokens, 
getCapacity())
   -                                    // subtract the consumed tokens
   -                                    - totalConsumedTokens);
   +                    // after adding new tokens subtract the pending 
consumed tokens and
   +                    // limit the tokens to the capacity of the bucket
   +                    currentTokens -> Math.min(currentTokens + newTokens - 
currentPendingConsumedTokens, getCapacity())
   +                            // subtract the consumed tokens
   +                            - consumeTokens);
            } else {
                // eventual consistent fast path, tokens are not updated 
immediately
   ```
   
   The problem with the previous solution was that it would first add new 
tokens and cap it to the capacity. With the eventually consistency logic in 
AsyncTokenBucket, that's a wrong solution. I'll provide a separate explanation 
for this in later updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to