This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ff3b6ce098c6735daa7acdc2babeb6da1e586cfb
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Feb 8 19:01:07 2025 +0200

    [fix][broker] Fix rate limiter token bucket and clock consistency issues 
causing excessive throttling and connection timeouts (#23930)
    
    (cherry picked from commit f8e4c11b5cd94382a3493b3e129e46bfc6a0621d)
---
 microbench/README.md                               |  26 +++
 .../broker/qos/AsyncTokenBucketBenchmark.java      |  17 +-
 ...=> DefaultMonotonicSnapshotClockBenchmark.java} |  55 +++--
 .../apache/pulsar/broker/qos/AsyncTokenBucket.java | 127 ++++++----
 .../pulsar/broker/qos/AsyncTokenBucketBuilder.java |  34 +++
 .../broker/qos/DefaultMonotonicSnapshotClock.java  | 260 ++++++++++++++++++---
 .../broker/qos/DynamicRateAsyncTokenBucket.java    |   7 +-
 .../qos/DynamicRateAsyncTokenBucketBuilder.java    |   6 +-
 .../broker/qos/FinalRateAsyncTokenBucket.java      |   7 +-
 .../qos/FinalRateAsyncTokenBucketBuilder.java      |   2 +-
 .../pulsar/broker/service/BrokerService.java       |   6 +
 .../broker/service/PublishRateLimiterImpl.java     |  11 +-
 .../service/persistent/DispatchRateLimiter.java    |  39 +++-
 .../service/persistent/SubscribeRateLimiter.java   |   8 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  46 ++--
 .../pulsar/broker/qos/AsyncTokenBucketTest.java    | 143 +++++++++++-
 .../qos/DefaultMonotonicSnapshotClockTest.java     | 185 +++++++++++++++
 .../RGUsageMTAggrWaitForAllMsgsTest.java           |   5 +-
 .../broker/service/PublishRateLimiterTest.java     |   5 +-
 .../api/AbstractMessageDispatchThrottlingTest.java | 116 +++++++++
 .../client/api/MessageDispatchThrottlingTest.java  | 166 ++++---------
 .../SubscriptionMessageDispatchThrottlingTest.java |  57 ++---
 .../client/impl/MessagePublishThrottlingTest.java  |   2 +-
 23 files changed, 1026 insertions(+), 304 deletions(-)

diff --git a/microbench/README.md b/microbench/README.md
index 780e3a5a1d3..f50c3036ff4 100644
--- a/microbench/README.md
+++ b/microbench/README.md
@@ -41,3 +41,29 @@ For fast recompiling of the benchmarks (without compiling 
Pulsar modules) and cr
 mvn -Pmicrobench -pl microbench clean package
 ```
 
+### Running specific benchmarks
+
+Display help:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar -h
+```
+
+Listing all benchmarks:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar -l
+```
+
+Running specific benchmarks:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*"
+```
+
+Checking what benchmarks match the pattern:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
+```
+
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
index 4c069e72ea3..1b210258f13 100644
--- 
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
@@ -33,6 +33,7 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
 
 @Fork(3)
 @BenchmarkMode(Mode.Throughput)
@@ -59,23 +60,29 @@ public class AsyncTokenBucketBenchmark {
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark001Threads() {
-        asyncTokenBucket.consumeTokens(1);
+    public void consumeTokensBenchmark001Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole);
     }
 
     @Threads(10)
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark010Threads() {
-        asyncTokenBucket.consumeTokens(1);
+    public void consumeTokensBenchmark010Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole);
     }
 
     @Threads(100)
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark100Threads() {
+    public void consumeTokensBenchmark100Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole);
+    }
+
+    private void consumeTokenAndGetTokens(Blackhole blackhole) {
         asyncTokenBucket.consumeTokens(1);
+        // blackhole is used to ensure that the compiler doesn't do dead code 
elimination
+        blackhole.consume(asyncTokenBucket.getTokens());
     }
 }
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
similarity index 58%
copy from 
microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
copy to 
microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
index 4c069e72ea3..d9054b8fe4b 100644
--- 
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
@@ -28,27 +28,19 @@ import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
 
 @Fork(3)
 @BenchmarkMode(Mode.Throughput)
 @OutputTimeUnit(TimeUnit.SECONDS)
 @State(Scope.Thread)
-public class AsyncTokenBucketBenchmark {
-    private AsyncTokenBucket asyncTokenBucket;
+public class DefaultMonotonicSnapshotClockBenchmark {
     private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
-            new 
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(8), 
System::nanoTime);
-
-    @Setup(Level.Iteration)
-    public void setup() {
-        long ratePerSecond = 100_000_000;
-        asyncTokenBucket = 
AsyncTokenBucket.builder().rate(ratePerSecond).clock(monotonicSnapshotClock)
-                .initialTokens(2 * ratePerSecond).capacity(2 * 
ratePerSecond).build();
-    }
+            new 
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), 
System::nanoTime);
 
     @TearDown(Level.Iteration)
     public void teardown() {
@@ -59,23 +51,52 @@ public class AsyncTokenBucketBenchmark {
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark001Threads() {
-        asyncTokenBucket.consumeTokens(1);
+    public void getTickNanos001Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, false);
+    }
+
+    @Threads(10)
+    @Benchmark
+    @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    public void getTickNanos010Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, false);
+    }
+
+    @Threads(100)
+    @Benchmark
+    @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    public void getTickNanos100Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, false);
+    }
+
+    @Threads(1)
+    @Benchmark
+    @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+    public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, true);
     }
 
     @Threads(10)
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark010Threads() {
-        asyncTokenBucket.consumeTokens(1);
+    public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, true);
     }
 
     @Threads(100)
     @Benchmark
     @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
     @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
-    public void consumeTokensBenchmark100Threads() {
-        asyncTokenBucket.consumeTokens(1);
+    public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) {
+        consumeTokenAndGetTokens(blackhole, true);
+    }
+
+    private void consumeTokenAndGetTokens(Blackhole blackhole, boolean 
requestSnapshot) {
+        // blackhole is used to ensure that the compiler doesn't do dead code 
elimination
+        
blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e59..8c43fa0a816 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.LongAdder;
  * connection or client from the throttling queue to unthrottle. Before 
unthrottling, the application should check
  * for available tokens. If tokens are still not available, the application 
should continue with throttling and
  * repeat the throttling loop.
+ * <p>By default, the AsyncTokenBucket is eventually consistent. This means 
that the token balance is updated
+ * with added tokens and consumed tokens at most once during each "increment", 
when time advances more than the
+ * configured resolution. There are settings for configuring consistency, 
please see {@link AsyncTokenBucketBuilder}
+ * for details.
  * <p>This class does not produce side effects outside its own scope. It 
functions similarly to a stateful function,
  * akin to a counter function. In essence, it is a sophisticated counter. It 
can serve as a foundational component for
  * constructing higher-level asynchronous rate limiter implementations, which 
require side effects for throttling.
@@ -119,9 +123,28 @@ public abstract class AsyncTokenBucket {
      */
     private final LongAdder pendingConsumedTokens = new LongAdder();
 
-    protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long 
resolutionNanos) {
+    /**
+     * By default, AsyncTokenBucket is eventually consistent. This means that 
the consumed tokens are subtracted from
+     * the total amount of tokens at most once during each "increment", when 
time advances more than the configured
+     * resolution. This setting determines if the consumed tokens are 
subtracted from tokens balance consistently.
+     * For high performance, it is recommended to keep this setting as false.
+     */
+    private final boolean consistentConsumedTokens;
+    /**
+     * By default, AsyncTokenBucket is eventually consistent. This means that 
the added tokens are calculated based
+     * on elapsed time at most once during each "increment", when time 
advances more than the configured
+     * resolution. This setting determines if the added tokens are calculated 
and added to tokens balance consistently.
+     * For high performance, it is recommended to keep this setting as false.
+     */
+    private final boolean consistentAddedTokens;
+
+    protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long 
resolutionNanos,
+                               boolean consistentConsumedTokens, boolean 
consistentAddedTokens) {
         this.clockSource = clockSource;
         this.resolutionNanos = resolutionNanos;
+        this.lastNanos = Long.MIN_VALUE;
+        this.consistentConsumedTokens = consistentConsumedTokens;
+        this.consistentAddedTokens = consistentAddedTokens;
     }
 
     public static FinalRateAsyncTokenBucketBuilder builder() {
@@ -139,36 +162,46 @@ public abstract class AsyncTokenBucket {
     /**
      * Consumes tokens and possibly updates the tokens balance. New tokens are 
calculated and added to the current
      * tokens balance each time the update takes place. The update takes place 
once in every interval of the configured
-     * resolutionNanos or when the forceUpdateTokens parameter is true.
+     * resolutionNanos or when the forceConsistentTokens parameter is true.
      * When the tokens balance isn't updated, the consumed tokens are added to 
the pendingConsumedTokens LongAdder
      * counter which gets flushed the next time the tokens are updated. This 
makes the tokens balance
      * eventually consistent. The reason for this design choice is to optimize 
performance by preventing CAS loop
      * contention which could cause excessive CPU consumption.
      *
      * @param consumeTokens     number of tokens to consume, can be 0 to 
update the tokens balance
-     * @param forceUpdateTokens if true, the tokens are updated even if the 
configured resolution hasn't passed
+     * @param forceConsistentTokens if true, the token balance is updated 
consistently
      * @return the current number of tokens in the bucket or Long.MIN_VALUE 
when the number of tokens is unknown due
      * to eventual consistency
      */
-    private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, 
boolean forceUpdateTokens) {
+    private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, 
boolean forceConsistentTokens) {
         if (consumeTokens < 0) {
             throw new IllegalArgumentException("consumeTokens must be >= 0");
         }
-        long currentNanos = clockSource.getTickNanos(forceUpdateTokens);
+        boolean requestConsistentTickNanosSnapshot =
+                consistentAddedTokens || consistentConsumedTokens || 
forceConsistentTokens || resolutionNanos == 0;
+        long currentNanos = 
clockSource.getTickNanos(requestConsistentTickNanosSnapshot);
+        long newTokens = 0;
         // check if the tokens should be updated immediately
-        if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
+        if (shouldAddTokensImmediately(currentNanos, forceConsistentTokens)) {
             // calculate the number of new tokens since the last update
-            long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
-            // calculate the total amount of tokens to consume in this update
+            newTokens = calculateNewTokensSinceLastUpdate(currentNanos, 
forceConsistentTokens);
+        }
+        // update tokens if there are new tokens or if resolutionNanos is set 
to 0 which is currently used for testing
+        if (newTokens > 0 || resolutionNanos == 0 || consistentConsumedTokens 
|| forceConsistentTokens) {
             // flush the pendingConsumedTokens by calling "sumThenReset"
-            long totalConsumedTokens = consumeTokens + 
pendingConsumedTokens.sumThenReset();
-            // update the tokens and return the current token value
-            return TOKENS_UPDATER.updateAndGet(this,
-                    currentTokens ->
-                            // after adding new tokens, limit the tokens to 
the capacity
-                            Math.min(currentTokens + newTokens, getCapacity())
-                                    // subtract the consumed tokens
-                                    - totalConsumedTokens);
+            long currentPendingConsumedTokens = 
pendingConsumedTokens.sumThenReset();
+            // calculate the token delta by subtracting the consumed tokens 
from the new tokens
+            long tokenDelta = newTokens - currentPendingConsumedTokens;
+            if (tokenDelta != 0 || consumeTokens != 0) {
+                // update the tokens and return the current token value
+                return TOKENS_UPDATER.updateAndGet(this,
+                        // limit the tokens to the capacity of the bucket
+                        currentTokens -> Math.min(currentTokens + tokenDelta, 
getCapacity())
+                                // subtract the consumed tokens from the 
capped tokens
+                                - consumeTokens);
+            } else {
+                return tokens;
+            }
         } else {
             // eventual consistent fast path, tokens are not updated 
immediately
 
@@ -187,19 +220,19 @@ public abstract class AsyncTokenBucket {
      *
      * The tokens will be updated once every resolutionNanos nanoseconds.
      * This method checks if the configured resolutionNanos has passed since 
the last update.
-     * If the forceUpdateTokens is true, the tokens will be updated 
immediately.
+     * If the forceConsistentTokens is true, the tokens will be updated 
immediately.
      *
-     * @param currentNanos the current monotonic clock time in nanoseconds
-     * @param forceUpdateTokens if true, the tokens will be updated immediately
+     * @param currentNanos      the current monotonic clock time in nanoseconds
+     * @param forceConsistentTokens if true, the tokens are added even if the 
configured resolution hasn't fully passed
      * @return true if the tokens should be updated immediately, false 
otherwise
      */
-    private boolean shouldUpdateTokensImmediately(long currentNanos, boolean 
forceUpdateTokens) {
+    private boolean shouldAddTokensImmediately(long currentNanos, boolean 
forceConsistentTokens) {
         long currentIncrement = resolutionNanos != 0 ? currentNanos / 
resolutionNanos : 0;
         long currentLastIncrement = lastIncrement;
         return currentIncrement == 0
                 || (currentIncrement > currentLastIncrement
                 && LAST_INCREMENT_UPDATER.compareAndSet(this, 
currentLastIncrement, currentIncrement))
-                || forceUpdateTokens;
+                || consistentAddedTokens || forceConsistentTokens;
     }
 
     /**
@@ -209,10 +242,22 @@ public abstract class AsyncTokenBucket {
      * @param currentNanos the current monotonic clock time in nanoseconds
      * @return the number of new tokens to add since the last update
      */
-    private long calculateNewTokensSinceLastUpdate(long currentNanos) {
+    private long calculateNewTokensSinceLastUpdate(long currentNanos, boolean 
forceConsistentTokens) {
+        long previousLastNanos = lastNanos;
+        long newLastNanos;
+        // update lastNanos only if at least resolutionNanos/2 nanoseconds has 
passed since the last update
+        // unless consistency is needed
+        long minimumIncrementNanos = forceConsistentTokens || 
consistentAddedTokens ? 0L : resolutionNanos / 2;
+        if (currentNanos > previousLastNanos + minimumIncrementNanos) {
+            newLastNanos = currentNanos;
+        } else {
+            newLastNanos = previousLastNanos;
+        }
         long newTokens;
-        long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this, 
currentNanos);
-        if (previousLastNanos == 0) {
+        if (newLastNanos == previousLastNanos
+                // prevent races with a CAS update of lastNanos
+                || !LAST_NANOS_UPDATER.compareAndSet(this, previousLastNanos, 
newLastNanos)
+                || previousLastNanos == Long.MIN_VALUE) {
             newTokens = 0;
         } else {
             long durationNanos = currentNanos - previousLastNanos + 
REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
@@ -267,15 +312,14 @@ public abstract class AsyncTokenBucket {
     }
 
     /**
-     * Returns the current token balance. When forceUpdateTokens is true, the 
tokens balance is updated before
-     * returning. If forceUpdateTokens is false, the tokens balance could be 
updated if the last updated happened
+     * Returns the current token balance. When forceConsistentTokens is true, 
the tokens balance is updated before
+     * returning. If forceConsistentTokens is false, the tokens balance could 
be updated if the last updated happened
      * more than resolutionNanos nanoseconds ago.
      *
-     * @param forceUpdateTokens if true, the tokens balance is updated before 
returning
      * @return the current token balance
      */
-    protected long tokens(boolean forceUpdateTokens) {
-        long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, 
forceUpdateTokens);
+    private long tokens() {
+        long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, 
false);
         if (currentTokens != Long.MIN_VALUE) {
             // when currentTokens isn't Long.MIN_VALUE, the current tokens 
balance is known
             return currentTokens;
@@ -295,7 +339,7 @@ public abstract class AsyncTokenBucket {
         long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true);
         if (currentTokens == Long.MIN_VALUE) {
             throw new IllegalArgumentException(
-                    "Unexpected result from updateAndConsumeTokens with 
forceUpdateTokens set to true");
+                    "Unexpected result from updateAndConsumeTokens with 
forceConsistentTokens set to true");
         }
         if (currentTokens > 0) {
             return 0L;
@@ -309,10 +353,11 @@ public abstract class AsyncTokenBucket {
 
     /**
      * Returns the current number of tokens in the bucket.
-     * The token balance is updated if the configured resolutionNanos has 
passed since the last update.
+     * The token balance is updated if the configured resolutionNanos has 
passed since the last update unless
+     * consistentConsumedTokens is true.
      */
     public final long getTokens() {
-        return tokens(false);
+        return tokens();
     }
 
     public abstract long getRate();
@@ -320,25 +365,13 @@ public abstract class AsyncTokenBucket {
     /**
      * Checks if the bucket contains tokens.
      * The token balance is updated before the comparison if the configured 
resolutionNanos has passed since the last
-     * update. It's possible that the returned result is not definite since 
the token balance is eventually consistent.
+     * update. It's possible that the returned result is not definite since 
the token balance is eventually consistent
+     * if consistentConsumedTokens is false.
      *
      * @return true if the bucket contains tokens, false otherwise
      */
     public boolean containsTokens() {
-        return containsTokens(false);
-    }
-
-    /**
-     * Checks if the bucket contains tokens.
-     * The token balance is updated before the comparison if the configured 
resolutionNanos has passed since the last
-     * update. The token balance is also updated when forceUpdateTokens is 
true.
-     * It's possible that the returned result is not definite since the token 
balance is eventually consistent.
-     *
-     * @param forceUpdateTokens if true, the token balance is updated before 
the comparison
-     * @return true if the bucket contains tokens, false otherwise
-     */
-    public boolean containsTokens(boolean forceUpdateTokens) {
-        return tokens(forceUpdateTokens) > 0;
+        return tokens() > 0;
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
index ee256d5a37d..1c05f1a213e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
@@ -23,6 +23,8 @@ package org.apache.pulsar.broker.qos;
 public abstract class AsyncTokenBucketBuilder<SELF extends 
AsyncTokenBucketBuilder<SELF>> {
     protected MonotonicSnapshotClock clock = 
AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK;
     protected long resolutionNanos = AsyncTokenBucket.defaultResolutionNanos;
+    protected boolean consistentConsumedTokens;
+    protected boolean consistentAddedTokens;
 
     protected AsyncTokenBucketBuilder() {
     }
@@ -31,15 +33,47 @@ public abstract class AsyncTokenBucketBuilder<SELF extends 
AsyncTokenBucketBuild
         return (SELF) this;
     }
 
+    /**
+     * Set the clock source for the token bucket. It's recommended to use the 
{@link DefaultMonotonicSnapshotClock}
+     * for most use cases.
+     */
     public SELF clock(MonotonicSnapshotClock clock) {
         this.clock = clock;
         return self();
     }
 
+    /**
+     * By default, AsyncTokenBucket is eventually consistent. This means that 
the token balance is updated, when time
+     * advances more than the configured resolution. This setting determines 
the duration of the increment.
+     * Setting this value to 0 will make the token balance fully consistent. 
There's a performance trade-off
+     * when setting this value to 0.
+     */
     public SELF resolutionNanos(long resolutionNanos) {
         this.resolutionNanos = resolutionNanos;
         return self();
     }
 
+    /**
+     * By default, AsyncTokenBucket is eventually consistent. This means that 
the consumed tokens are subtracted from
+     * the total amount of tokens at most once during each "increment", when 
time advances more than the configured
+     * resolution. This setting determines if the consumed tokens are 
subtracted from tokens balance consistently.
+     * For high performance, it is recommended to keep this setting as false.
+     */
+    public SELF consistentConsumedTokens(boolean consistentConsumedTokens) {
+        this.consistentConsumedTokens = consistentConsumedTokens;
+        return self();
+    }
+
+    /**
+     * By default, AsyncTokenBucket is eventually consistent. This means that 
the added tokens are calculated based
+     * on elapsed time at most once during each "increment", when time 
advances more than the configured
+     * resolution. This setting determines if the added tokens are calculated 
and added to tokens balance consistently.
+     * For high performance, it is recommended to keep this setting as false.
+     */
+    public SELF consistentAddedTokens(boolean consistentAddedTokens) {
+        this.consistentAddedTokens = consistentAddedTokens;
+        return self();
+    }
+
     public abstract AsyncTokenBucket build();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
index df3843921ed..23b9359c804 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
@@ -19,71 +19,269 @@
 
 package org.apache.pulsar.broker.qos;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
 import java.util.function.LongSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Default implementation of {@link MonotonicSnapshotClock}.
+ * Default implementation of {@link MonotonicSnapshotClock} optimized for use 
with {@link AsyncTokenBucket}.
  *
- * Starts a daemon thread that updates the snapshot value periodically with a 
configured interval. The close method
- * should be called to stop the thread.
+ * <p>
+ * This class provides a monotonic snapshot value that consistently increases, 
ensuring reliable behavior
+ * even in environments where the underlying clock source may not be strictly 
monotonic across all CPUs,
+ * such as certain virtualized platforms.
+ * </p>
+ *
+ * <p>
+ * Upon instantiation, a daemon thread is launched to periodically update the 
snapshot value at a configured
+ * interval. It is essential to invoke the {@link #close()} method to 
gracefully terminate this thread when it is
+ * no longer needed.
+ * </p>
+ *
+ * <p>
+ * The {@link AsyncTokenBucket} utilizes this clock to obtain tick values. It 
does not require a consistent value on
+ * every retrieval. However, when a consistent snapshot is necessary, the 
{@link #getTickNanos(boolean)} method
+ * is called with the {@code requestSnapshot} parameter set to {@code true}.
+ * </p>
+ *
+ * <p>
+ * By employing a single thread to update the monotonic clock value, this 
implementation ensures that the snapshot
+ * value remains strictly increasing. This approach mitigates potential 
inconsistencies that may arise from clock
+ * source discrepancies across different CPUs.
+ * </p>
  */
 public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock, 
AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class);
-    private final long sleepMillis;
-    private final int sleepNanos;
-    private final LongSupplier clockSource;
-    private final Thread thread;
+    private final TickUpdaterThread tickUpdaterThread;
     private volatile long snapshotTickNanos;
 
     public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, 
LongSupplier clockSource) {
         if (snapshotIntervalNanos < TimeUnit.MILLISECONDS.toNanos(1)) {
             throw new IllegalArgumentException("snapshotIntervalNanos must be 
at least 1 millisecond");
         }
-        this.sleepMillis = 
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
-        this.sleepNanos = (int) (snapshotIntervalNanos - 
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
-        this.clockSource = clockSource;
-        updateSnapshotTickNanos();
-        thread = new Thread(this::snapshotLoop, getClass().getSimpleName() + 
"-update-loop");
-        thread.setDaemon(true);
-        thread.start();
+        tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos,
+                Objects.requireNonNull(clockSource, "clockSource must not be 
null"), this::setSnapshotTickNanos);
+        tickUpdaterThread.start();
+    }
+
+    private void setSnapshotTickNanos(long snapshotTickNanos) {
+        this.snapshotTickNanos = snapshotTickNanos;
     }
 
     /** {@inheritDoc} */
     @Override
     public long getTickNanos(boolean requestSnapshot) {
         if (requestSnapshot) {
-            updateSnapshotTickNanos();
+            tickUpdaterThread.requestUpdateAndWait();
         }
         return snapshotTickNanos;
     }
 
-    private void updateSnapshotTickNanos() {
-        snapshotTickNanos = clockSource.getAsLong();
+    @Override
+    public void close() {
+        tickUpdaterThread.interrupt();
     }
 
-    private void snapshotLoop() {
-        try {
-            while (!Thread.currentThread().isInterrupted()) {
-                updateSnapshotTickNanos();
+    /**
+     * A thread that updates snapshotTickNanos value periodically with a 
configured interval.
+     * The thread is started when the DefaultMonotonicSnapshotClock is created 
and runs until the close method is
+     * called.
+     * A single thread is used to read the clock source value since on some 
hardware of virtualized platforms,
+     * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by 
a single thread will improve the
+     * stability of the read value since a single thread is scheduled on a 
single CPU. If the thread is migrated
+     * to another CPU, the clock source value might leap backward or forward, 
but logic in this class will handle it.
+     */
+    private static class TickUpdaterThread extends Thread {
+        private final Object tickUpdateDelayMonitor = new Object();
+        private final Object tickUpdatedMonitor = new Object();
+        private final MonotonicLeapDetectingTickUpdater tickUpdater;
+        private volatile boolean running;
+        private boolean tickUpdateDelayMonitorNotified;
+        private AtomicLong requestCount = new AtomicLong();
+        private final long sleepMillis;
+        private final int sleepNanos;
+
+        TickUpdaterThread(long snapshotIntervalNanos, LongSupplier 
clockSource, LongConsumer setSnapshotTickNanos) {
+            super(DefaultMonotonicSnapshotClock.class.getSimpleName() + 
"-update-loop");
+            // set as daemon thread so that it doesn't prevent the JVM from 
exiting
+            setDaemon(true);
+            // set the highest priority
+            setPriority(MAX_PRIORITY);
+            this.sleepMillis = 
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
+            this.sleepNanos = (int) (snapshotIntervalNanos - 
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
+            tickUpdater = new MonotonicLeapDetectingTickUpdater(clockSource, 
setSnapshotTickNanos,
+                    snapshotIntervalNanos);
+        }
+
+        @Override
+        public void run() {
+            try {
+                running = true;
+                long updatedForRequestCount = -1;
+                while (!isInterrupted()) {
+                    try {
+                        // track if the thread has waited for the whole 
duration of the snapshot interval
+                        // before updating the tick value
+                        boolean waitedSnapshotInterval = false;
+                        // sleep for the configured interval on a monitor that 
can be notified to stop the sleep
+                        // and update the tick value immediately. This is used 
in requestUpdate method.
+                        synchronized (tickUpdateDelayMonitor) {
+                            tickUpdateDelayMonitorNotified = false;
+                            // only wait if no explicit request has been made 
since the last update
+                            if (requestCount.get() == updatedForRequestCount) {
+                                // if no request has been made, sleep for the 
configured interval
+                                tickUpdateDelayMonitor.wait(sleepMillis, 
sleepNanos);
+                                waitedSnapshotInterval = 
!tickUpdateDelayMonitorNotified;
+                            }
+                        }
+                        updatedForRequestCount = requestCount.get();
+                        // update the tick value using the tick updater which 
will tolerate leaps backward
+                        tickUpdater.update(waitedSnapshotInterval);
+                        notifyAllTickUpdated();
+                    } catch (InterruptedException e) {
+                        interrupt();
+                        break;
+                    }
+                }
+            } catch (Throwable t) {
+                // report unexpected error since this would be a fatal error 
when the clock doesn't progress anymore
+                // this is very unlikely to happen, but it's better to log it 
in any case
+                LOG.error("Unexpected fatal error that stopped the clock.", t);
+            } finally {
+                LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread 
stopped. {},tid={}", this, getId());
+                running = false;
+                notifyAllTickUpdated();
+            }
+        }
+
+        private void notifyAllTickUpdated() {
+            synchronized (tickUpdatedMonitor) {
+                // notify all threads that are waiting for the tick value to 
be updated
+                tickUpdatedMonitor.notifyAll();
+            }
+        }
+
+        public void requestUpdateAndWait() {
+            if (!running) {
+                synchronized (tickUpdater) {
+                    // thread has stopped running, fallback to update the 
value directly without optimizations
+                    tickUpdater.update(false);
+                }
+                return;
+            }
+            // increment the request count that ensures that the thread will 
update the tick value after this request
+            // was made also when there's a race condition between the request 
and the update
+            // this solution doesn't prevent all races, and it's not 
guaranteed that the tick value is always updated
+            // it will prevent the request having to wait for the delayed 
update cycle. This is sufficient for the
+            // use case.
+            requestCount.incrementAndGet();
+            synchronized (tickUpdatedMonitor) {
+                // notify the thread to stop waiting and update the tick value
+                synchronized (tickUpdateDelayMonitor) {
+                    tickUpdateDelayMonitorNotified = true;
+                    tickUpdateDelayMonitor.notify();
+                }
+                // wait until the tick value has been updated
                 try {
-                    Thread.sleep(sleepMillis, sleepNanos);
+                    tickUpdatedMonitor.wait();
                 } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    break;
+                    currentThread().interrupt();
+                }
+            }
+        }
+
+        @Override
+        public synchronized void start() {
+            // wait until the thread is started and the tick value has been 
updated
+            synchronized (tickUpdatedMonitor) {
+                super.start();
+                try {
+                    tickUpdatedMonitor.wait();
+                } catch (InterruptedException e) {
+                    currentThread().interrupt();
                 }
             }
-        } catch (Throwable t) {
-            // report unexpected error since this would be a fatal error when 
the clock doesn't progress anymore
-            // this is very unlikely to happen, but it's better to log it in 
any case
-            LOG.error("Unexpected fatal error that stopped the clock.", t);
         }
     }
 
-    @Override
-    public void close() {
-        thread.interrupt();
+    /**
+     * Handles updating the tick value in a monotonic way so that the value is 
always increasing,
+     * regardless of leaps backward in the clock source value.
+     */
+    static class MonotonicLeapDetectingTickUpdater {
+        private final LongSupplier clockSource;
+        private final long snapshotInternalNanos;
+        private final long maxDeltaNanosForLeapDetection;
+        private final LongConsumer tickUpdatedCallback;
+        private long referenceClockSourceValue = Long.MIN_VALUE;
+        private long baseSnapshotTickNanos;
+        private long previousSnapshotTickNanos;
+
+        MonotonicLeapDetectingTickUpdater(LongSupplier clockSource, 
LongConsumer tickUpdatedCallback,
+                                          long snapshotInternalNanos) {
+            this.clockSource = clockSource;
+            this.snapshotInternalNanos = snapshotInternalNanos;
+            this.maxDeltaNanosForLeapDetection = 2 * snapshotInternalNanos;
+            this.tickUpdatedCallback = tickUpdatedCallback;
+        }
+
+        /**
+         * Updates the snapshot tick value. The tickUpdatedCallback is called 
if the value has changed.
+         * The value is updated in a monotonic way so that the value is always 
increasing, regardless of leaps backward
+         * in the clock source value.
+         * Leap detection is done by comparing the new value with the previous 
value and the maximum delta value.
+         *
+         * @param waitedSnapshotInterval if true, the method has waited for 
the snapshot interval since the previous
+         *                               call.
+         */
+        public void update(boolean waitedSnapshotInterval) {
+            // get the current clock source value
+            long clockValue = clockSource.getAsLong();
+
+            // Initialization on first call
+            if (referenceClockSourceValue == Long.MIN_VALUE) {
+                referenceClockSourceValue = clockValue;
+                baseSnapshotTickNanos = clockValue;
+                previousSnapshotTickNanos = clockValue;
+                // update the tick value using the callback
+                tickUpdatedCallback.accept(clockValue);
+                return;
+            }
+
+            // calculate the duration since the reference clock source value
+            // so that the snapshot value is always increasing and tolerates 
it when the clock source is not strictly
+            // monotonic across all CPUs and leaps backward
+            long durationSinceReference = clockValue - 
referenceClockSourceValue;
+            // calculate the new snapshot tick value as a duration since the 
reference clock source value
+            // and add it to the base snapshot tick value
+            long newSnapshotTickNanos = baseSnapshotTickNanos + 
durationSinceReference;
+
+            // reset the reference clock source value if the clock source 
value leaps backward
+            // more than the maximum delta value
+            if (newSnapshotTickNanos < previousSnapshotTickNanos - 
maxDeltaNanosForLeapDetection) {
+                // when the clock source value leaps backward, reset the 
reference value to the new value
+                // for future duration calculations
+                referenceClockSourceValue = clockValue;
+                // if the updater thread has waited for the snapshot interval 
since the previous call,
+                // increment the base snapshot tick value by the snapshot 
interval value
+                long incrementWhenLeapDetected = waitedSnapshotInterval ? 
snapshotInternalNanos : 0;
+                // set the base snapshot tick value to the new value
+                baseSnapshotTickNanos = previousSnapshotTickNanos + 
incrementWhenLeapDetected;
+                // set the new snapshot tick value to the base value
+                newSnapshotTickNanos = baseSnapshotTickNanos;
+            }
+
+            // update snapshotTickNanos value if the new value is greater than 
the previous value
+            if (newSnapshotTickNanos > previousSnapshotTickNanos) {
+                // store the previous value
+                previousSnapshotTickNanos = newSnapshotTickNanos;
+                // update the tick value using the callback
+                tickUpdatedCallback.accept(newSnapshotTickNanos);
+            }
+        }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
index 8edc73d1f51..f2eae8aed8d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
@@ -34,15 +34,16 @@ public class DynamicRateAsyncTokenBucket extends 
AsyncTokenBucket {
 
     protected DynamicRateAsyncTokenBucket(double capacityFactor, LongSupplier 
rateFunction,
                                           MonotonicSnapshotClock clockSource, 
LongSupplier ratePeriodNanosFunction,
-                                          long resolutionNanos, double 
initialTokensFactor,
+                                          long resolutionNanos, boolean 
consistentConsumedTokens,
+                                          boolean consistentAddedTokens, 
double initialTokensFactor,
                                           double 
targetFillFactorAfterThrottling) {
-        super(clockSource, resolutionNanos);
+        super(clockSource, resolutionNanos, consistentConsumedTokens, 
consistentAddedTokens);
         this.capacityFactor = capacityFactor;
         this.rateFunction = rateFunction;
         this.ratePeriodNanosFunction = ratePeriodNanosFunction;
         this.targetFillFactorAfterThrottling = targetFillFactorAfterThrottling;
         this.tokens = (long) (rateFunction.getAsLong() * initialTokensFactor);
-        tokens(false);
+        getTokens();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
index 22270484c72..8aebecddf90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
@@ -64,9 +64,7 @@ public class DynamicRateAsyncTokenBucketBuilder
     @Override
     public AsyncTokenBucket build() {
         return new DynamicRateAsyncTokenBucket(this.capacityFactor, 
this.rateFunction,
-                this.clock,
-                this.ratePeriodNanosFunction, this.resolutionNanos,
-                this.initialFillFactor,
-                targetFillFactorAfterThrottling);
+                this.clock, this.ratePeriodNanosFunction, 
this.resolutionNanos, this.consistentConsumedTokens,
+                this.consistentAddedTokens, this.initialFillFactor, 
targetFillFactorAfterThrottling);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
index 627c5ee1334..d83290b723f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
@@ -30,15 +30,16 @@ class FinalRateAsyncTokenBucket extends AsyncTokenBucket {
     private final long targetAmountOfTokensAfterThrottling;
 
     protected FinalRateAsyncTokenBucket(long capacity, long rate, 
MonotonicSnapshotClock clockSource,
-                                        long ratePeriodNanos, long 
resolutionNanos, long initialTokens) {
-        super(clockSource, resolutionNanos);
+                                        long ratePeriodNanos, long 
resolutionNanos, boolean consistentConsumedTokens,
+                                        boolean consistentAddedTokens, long 
initialTokens) {
+        super(clockSource, resolutionNanos, consistentConsumedTokens, 
consistentAddedTokens);
         this.capacity = capacity;
         this.rate = rate;
         this.ratePeriodNanos = ratePeriodNanos != -1 ? ratePeriodNanos : 
ONE_SECOND_NANOS;
         // The target amount of tokens is the amount of tokens made available 
in the resolution duration
         this.targetAmountOfTokensAfterThrottling = 
Math.max(this.resolutionNanos * rate / ratePeriodNanos, 1);
         this.tokens = initialTokens;
-        tokens(false);
+        getTokens();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
index ff4ed53c6c7..a292000eaa8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
@@ -55,7 +55,7 @@ public class FinalRateAsyncTokenBucketBuilder
     public AsyncTokenBucket build() {
         return new FinalRateAsyncTokenBucket(this.capacity != null ? 
this.capacity : this.rate, this.rate,
                 this.clock,
-                this.ratePeriodNanos, this.resolutionNanos,
+                this.ratePeriodNanos, this.resolutionNanos, 
this.consistentConsumedTokens, this.consistentAddedTokens,
                 this.initialTokens != null ? this.initialTokens : this.rate
         );
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 89083b5416b..c123673dd32 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2426,6 +2426,12 @@ public class BrokerService implements Closeable {
 
 
     private void handleMetadataChanges(Notification n) {
+        if (!pulsar.isRunning()) {
+            // Ignore metadata changes when broker is not running
+            log.info("Ignoring metadata change since broker is not running 
(id={}, state={}) {}", pulsar.getBrokerId(),
+                    pulsar.getState(), n);
+            return;
+        }
         if (n.getType() == NotificationType.Modified && 
NamespaceResources.pathIsFromNamespace(n.getPath())) {
             NamespaceName ns = 
NamespaceResources.namespaceFromPath(n.getPath());
             handlePoliciesUpdates(ns);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 8255d9b6931..90c8de5f97a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -20,11 +20,11 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.channel.EventLoopGroup;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.qos.AsyncTokenBucket;
 import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
 
+@Slf4j
 public class PublishRateLimiterImpl implements PublishRateLimiter {
     private volatile AsyncTokenBucket tokenBucketOnMessage;
     private volatile AsyncTokenBucket tokenBucketOnByte;
@@ -80,7 +81,7 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
         // schedule unthrottling when the throttling count is incremented to 1
         // this is to avoid scheduling unthrottling multiple times for 
concurrent producers
         if (throttledProducersCount.incrementAndGet() == 1) {
-            EventLoopGroup executor = 
producer.getCnx().getBrokerService().executor();
+            ScheduledExecutorService executor = 
producer.getCnx().getBrokerService().executor().next();
             scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
         }
     }
@@ -134,7 +135,11 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
             // unthrottle as many producers as possible while there are token 
available
             while ((throttlingDuration = calculateThrottlingDurationNanos()) 
== 0L
                     && (producer = unthrottlingQueue.poll()) != null) {
-                producer.decrementThrottleCount();
+                try {
+                    producer.decrementThrottleCount();
+                } catch (Exception e) {
+                    log.error("Failed to unthrottle producer {}", producer, e);
+                }
                 throttledProducersCount.decrementAndGet();
             }
             // if there are still producers to be unthrottled, schedule 
unthrottling again
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b29cbcd660d..f43b134eb12 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -76,7 +77,9 @@ public class DispatchRateLimiter {
      * @return
      */
     public long getAvailableDispatchRateLimitOnMsg() {
-        return dispatchRateLimiterOnMessage == null ? -1 : 
Math.max(dispatchRateLimiterOnMessage.getTokens(), 0);
+        AsyncTokenBucket localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
+        return localDispatchRateLimiterOnMessage == null ? -1 :
+                Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0);
     }
 
     /**
@@ -85,7 +88,8 @@ public class DispatchRateLimiter {
      * @return
      */
     public long getAvailableDispatchRateLimitOnByte() {
-        return dispatchRateLimiterOnByte == null ? -1 : 
Math.max(dispatchRateLimiterOnByte.getTokens(), 0);
+        AsyncTokenBucket localDispatchRateLimiterOnByte = 
dispatchRateLimiterOnByte;
+        return localDispatchRateLimiterOnByte == null ? -1 : 
Math.max(localDispatchRateLimiterOnByte.getTokens(), 0);
     }
 
     /**
@@ -95,11 +99,13 @@ public class DispatchRateLimiter {
      * @param byteSize
      */
     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
-        if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
-            dispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
+        AsyncTokenBucket localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
+        if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) 
{
+            localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
         }
-        if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
-            dispatchRateLimiterOnByte.consumeTokens(byteSize);
+        AsyncTokenBucket localDispatchRateLimiterOnByte = 
dispatchRateLimiterOnByte;
+        if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
+            localDispatchRateLimiterOnByte.consumeTokens(byteSize);
         }
     }
 
@@ -221,13 +227,14 @@ public class DispatchRateLimiter {
         if (msgRate > 0) {
             if (dispatchRate.isRelativeToPublishRate()) {
                 this.dispatchRateLimiterOnMessage =
-                        AsyncTokenBucket.builderForDynamicRate()
+                        
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
                                 .rateFunction(() -> 
getRelativeDispatchRateInMsg(dispatchRate))
                                 .ratePeriodNanosFunction(() -> ratePeriodNanos)
                                 .build();
             } else {
                 this.dispatchRateLimiterOnMessage =
-                        
AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos)
+                        configureAsyncTokenBucket(AsyncTokenBucket.builder())
+                                .rate(msgRate).ratePeriodNanos(ratePeriodNanos)
                                 .build();
             }
         } else {
@@ -238,13 +245,14 @@ public class DispatchRateLimiter {
         if (byteRate > 0) {
             if (dispatchRate.isRelativeToPublishRate()) {
                 this.dispatchRateLimiterOnByte =
-                        AsyncTokenBucket.builderForDynamicRate()
+                        
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
                                 .rateFunction(() -> 
getRelativeDispatchRateInByte(dispatchRate))
                                 .ratePeriodNanosFunction(() -> ratePeriodNanos)
                                 .build();
             } else {
                 this.dispatchRateLimiterOnByte =
-                        
AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos)
+                        configureAsyncTokenBucket(AsyncTokenBucket.builder())
+                                
.rate(byteRate).ratePeriodNanos(ratePeriodNanos)
                                 .build();
             }
         } else {
@@ -252,6 +260,11 @@ public class DispatchRateLimiter {
         }
     }
 
+    private <T extends AsyncTokenBucketBuilder<T>> T 
configureAsyncTokenBucket(T builder) {
+        builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock());
+        return builder;
+    }
+
     private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
         return (topic != null && dispatchRate != null)
                 ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + 
dispatchRate.getDispatchThrottlingRateInMsg()
@@ -270,7 +283,8 @@ public class DispatchRateLimiter {
      * @return
      */
     public long getDispatchRateOnMsg() {
-        return dispatchRateLimiterOnMessage != null ? 
dispatchRateLimiterOnMessage.getRate() : -1;
+        AsyncTokenBucket localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
+        return localDispatchRateLimiterOnMessage != null ? 
localDispatchRateLimiterOnMessage.getRate() : -1;
     }
 
     /**
@@ -279,7 +293,8 @@ public class DispatchRateLimiter {
      * @return
      */
     public long getDispatchRateOnByte() {
-        return dispatchRateLimiterOnByte != null ? 
dispatchRateLimiterOnByte.getRate() : -1;
+        AsyncTokenBucket localDispatchRateLimiterOnByte = 
dispatchRateLimiterOnByte;
+        return localDispatchRateLimiterOnByte != null ? 
localDispatchRateLimiterOnByte.getRate() : -1;
     }
 
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index b1de10e73b7..0f98ab94142 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -70,7 +70,7 @@ public class SubscribeRateLimiter {
         if (tokenBucket == null) {
             return true;
         }
-        if (!tokenBucket.containsTokens(true)) {
+        if (!tokenBucket.containsTokens()) {
             return false;
         }
         tokenBucket.consumeTokens(1);
@@ -117,7 +117,11 @@ public class SubscribeRateLimiter {
         // update subscribe-rateLimiter
         if (ratePerConsumer > 0) {
             AsyncTokenBucket tokenBucket =
-                    
AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
+                    AsyncTokenBucket.builder()
+                            .consistentAddedTokens(true)
+                            .consistentConsumedTokens(true)
+                            
.clock(brokerService.getPulsar().getMonotonicSnapshotClock())
+                            
.rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
             this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket);
         } else {
             // subscribe-rate should be disable and close
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ed1f5ccc545..b0a4789b373 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -170,20 +170,26 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 
     protected final void internalSetup() throws Exception {
         init();
-        lookupUrl = new URI(brokerUrl.toString());
-        if (isTcpLookup) {
-            lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-
+        lookupUrl = resolveLookupUrl();
+        if (isTcpLookup && enableBrokerGateway) {
             // setup port forwarding from the advertised port to the listen 
port
-            if (enableBrokerGateway) {
-                InetSocketAddress gatewayAddress = new 
InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
-                InetSocketAddress brokerAddress = new 
InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get());
-                brokerGateway = new PortForwarder(gatewayAddress, 
brokerAddress);
-            }
+            InetSocketAddress gatewayAddress = new 
InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
+            InetSocketAddress brokerAddress = new 
InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get());
+            brokerGateway = new PortForwarder(gatewayAddress, brokerAddress);
         }
         pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
     }
 
+    private URI resolveLookupUrl() {
+        if (isTcpLookup) {
+            return URI.create(pulsar.getBrokerServiceUrl());
+        } else {
+            return URI.create(brokerUrl != null
+                    ? brokerUrl.toString()
+                    : brokerUrlTls.toString());
+        }
+    }
+
     protected final void internalSetup(ServiceConfiguration 
serviceConfiguration) throws Exception {
         this.conf = serviceConfiguration;
         internalSetup();
@@ -228,11 +234,10 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 
     protected final void internalSetupForStatsTest() throws Exception {
         init();
-        String lookupUrl = brokerUrl.toString();
-        if (isTcpLookup) {
-            lookupUrl = new URI(pulsar.getBrokerServiceUrl()).toString();
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
         }
-        pulsarClient = newPulsarClient(lookupUrl, 1);
+        pulsarClient = newPulsarClient(resolveLookupUrl().toString(), 1);
     }
 
     protected void doInitConf() throws Exception {
@@ -357,6 +362,9 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     protected void restartBroker() throws Exception {
         stopBroker();
         startBroker();
+        if (pulsarClient == null) {
+            pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        }
     }
 
     protected void stopBroker() throws Exception {
@@ -381,12 +389,16 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         brokerUrl = pulsar.getWebServiceAddress() != null ? new 
URL(pulsar.getWebServiceAddress()) : null;
         brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new 
URL(pulsar.getWebServiceAddressTls()) : null;
 
-        if (admin != null) {
-            admin.close();
-            if (MockUtil.isMock(admin)) {
-                Mockito.reset(admin);
+        URI newLookupUrl = resolveLookupUrl();
+        if (lookupUrl == null || !newLookupUrl.equals(lookupUrl)) {
+            lookupUrl = newLookupUrl;
+            if (pulsarClient != null) {
+                pulsarClient.shutdown();
+                pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
             }
         }
+
+        closeAdmin();
         PulsarAdminBuilder pulsarAdminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
                 ? brokerUrl.toString()
                 : brokerUrlTls.toString());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
index b446f9e902f..82793f2748d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.qos;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +51,8 @@ public class AsyncTokenBucketTest {
     @Test
     void shouldAddTokensWithConfiguredRate() {
         asyncTokenBucket =
-                
AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+                AsyncTokenBucket.builder().consistentConsumedTokens(true)
+                        
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
         incrementSeconds(5);
         assertEquals(asyncTokenBucket.getTokens(), 50);
         incrementSeconds(1);
@@ -64,7 +66,7 @@ public class AsyncTokenBucketTest {
 
         // Consume all and verify none available and then wait 1 period and 
check replenished
         asyncTokenBucket.consumeTokens(100);
-        assertEquals(asyncTokenBucket.tokens(true), 0);
+        assertEquals(asyncTokenBucket.getTokens(), 0);
         incrementSeconds(1);
         assertEquals(asyncTokenBucket.getTokens(), 10);
     }
@@ -91,13 +93,148 @@ public class AsyncTokenBucketTest {
     @Test
     void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens() {
         asyncTokenBucket =
-                
AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+                AsyncTokenBucket.builder().capacity(100)
+                        .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+                        .rate(10)
+                        .initialTokens(0)
+                        .clock(clockSource)
+                        .build();
         for (int i = 0; i < 150; i++) {
             incrementMillis(1);
         }
         assertEquals(asyncTokenBucket.getTokens(), 1);
         incrementMillis(150);
         assertEquals(asyncTokenBucket.getTokens(), 3);
+        incrementMillis(1);
+        assertEquals(asyncTokenBucket.getTokens(), 3);
+        incrementMillis(99);
+        assertEquals(asyncTokenBucket.getTokens(), 4);
+    }
+
+    @Test
+    void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens2() {
+        asyncTokenBucket =
+                AsyncTokenBucket.builder().capacity(100)
+                        .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+                        .rate(1)
+                        .initialTokens(0)
+                        .clock(clockSource)
+                        .build();
+        for (int i = 0; i < 150; i++) {
+            incrementMillis(1);
+            assertEquals(asyncTokenBucket.getTokens(), 0);
+        }
+        incrementMillis(150);
+        assertEquals(asyncTokenBucket.getTokens(), 0);
+        incrementMillis(699);
+        assertEquals(asyncTokenBucket.getTokens(), 0);
+        incrementMillis(1);
+        assertEquals(asyncTokenBucket.getTokens(), 1);
+        incrementMillis(1000);
+        assertEquals(asyncTokenBucket.getTokens(), 2);
+    }
+
+    @Test
+    void shouldHandleNegativeBalanceWithEventuallyConsistentTokenUpdates() {
+        asyncTokenBucket =
+                AsyncTokenBucket.builder()
+                        // intentionally pick a coarse resolution
+                        .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+                        
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+        // assert that the token balance is 0 initially
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+
+        // consume tokens without exceeding the rate
+        for (int i = 0; i < 10000; i++) {
+            asyncTokenBucket.consumeTokens(500);
+            incrementSeconds(50);
+        }
+
+        // let 9 seconds pass
+        incrementSeconds(9);
+
+        // there should be 90 tokens available
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
     }
 
+    @Test
+    void shouldNotExceedTokenBucketSizeWithNegativeTokens() {
+        asyncTokenBucket =
+                AsyncTokenBucket.builder()
+                        // intentionally pick a coarse resolution
+                        .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+                        
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+        // assert that the token balance is 0 initially
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+
+        // consume tokens without exceeding the rate
+        for (int i = 0; i < 100; i++) {
+            asyncTokenBucket.consumeTokens(600);
+            incrementSeconds(50);
+            // let tokens accumulate back to 0 every 10 seconds
+            if ((i + 1) % 10 == 0) {
+                incrementSeconds(100);
+            }
+        }
+
+        // let 9 seconds pass
+        incrementSeconds(9);
+
+        // there should be 90 tokens available
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+    }
+
+    @Test
+    void 
shouldAccuratelyCalculateTokensWhenTimeIsLaggingBehindInInconsistentUpdates() {
+        clockSource = requestSnapshot -> {
+          if (requestSnapshot) {
+              return manualClockSource.get();
+          } else {
+              // let the clock lag behind
+              return manualClockSource.get() - TimeUnit.SECONDS.toNanos(52);
+          }
+        };
+        incrementSeconds(1);
+        asyncTokenBucket =
+                
AsyncTokenBucket.builder().resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+                        
.capacity(100).rate(10).initialTokens(100).clock(clockSource).build();
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(100);
+
+        // consume tokens without exceeding the rate
+        for (int i = 0; i < 10000; i++) {
+            asyncTokenBucket.consumeTokens(500);
+            incrementSeconds(i == 0 ? 40 : 50);
+        }
+
+        // let 9 seconds pass
+        incrementSeconds(9);
+
+        // there should be 90 tokens available
+        assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+    }
+
+    @Test
+    void shouldHandleEventualConsistency() {
+        AtomicLong offset = new AtomicLong(0);
+        long resolutionNanos = TimeUnit.MILLISECONDS.toNanos(1);
+        DefaultMonotonicSnapshotClock monotonicSnapshotClock =
+                new DefaultMonotonicSnapshotClock(resolutionNanos,
+                        () -> offset.get() + manualClockSource.get());
+        long initialTokens = 500L;
+        asyncTokenBucket =
+                AsyncTokenBucket.builder()
+                        .consistentConsumedTokens(true)
+                        .resolutionNanos(resolutionNanos)
+                        
.capacity(100000).rate(1000).initialTokens(initialTokens).clock(monotonicSnapshotClock).build();
+        for (int i = 0; i < 100000; i++) {
+            // increment the clock by 1ms, since rate is 1000 tokens/s, this 
should make 1 token available
+            incrementMillis(1);
+            // consume 1 token
+            asyncTokenBucket.consumeTokens(1);
+        }
+        assertThat(asyncTokenBucket.getTokens())
+                // since the rate is 1/ms and the test increments the clock by 
1ms and consumes 1 token in each
+                // iteration, the tokens should be equal to the initial tokens
+                .isEqualTo(initialTokens);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
new file mode 100644
index 00000000000..0820b439915
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.qos;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.data.Offset;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DefaultMonotonicSnapshotClockTest {
+    @DataProvider
+    private static Object[] booleanValues() {
+        return new Object[]{ true, false };
+    }
+
+    @Test(dataProvider = "booleanValues")
+    void testClockHandlesTimeLeapsBackwards(boolean requestSnapshot) throws 
InterruptedException {
+        long snapshotIntervalMillis = 5;
+        AtomicLong clockValue = new AtomicLong(1);
+        @Cleanup
+        DefaultMonotonicSnapshotClock clock =
+                new 
DefaultMonotonicSnapshotClock(Duration.ofMillis(snapshotIntervalMillis).toNanos(),
+                        clockValue::get);
+
+
+        long previousTick = -1;
+        boolean leapDirection = true;
+        for (int i = 0; i < 10000; i++) {
+            clockValue.addAndGet(TimeUnit.MILLISECONDS.toNanos(1));
+            long tick = clock.getTickNanos(requestSnapshot);
+            //log.info("i = {}, tick = {}", i, tick);
+            if ((i + 1) % 5 == 0) {
+                leapDirection = !leapDirection;
+                //log.info("Time leap 5 minutes backwards");
+                clockValue.addAndGet(-Duration.ofMinutes(5).toNanos());
+            }
+            if (previousTick != -1) {
+                assertThat(tick)
+                        .describedAs("i = %d, tick = %d, previousTick = %d", 
i, tick, previousTick)
+                        .isGreaterThanOrEqualTo(previousTick)
+                        .isCloseTo(previousTick,
+                                // then snapshot is requested, the time 
difference between the two ticks is accurate
+                                // otherwise allow time difference at most 4 
times the snapshot interval since the
+                                // clock is updated periodically by a 
background thread
+                                Offset.offset(TimeUnit.MILLISECONDS.toNanos(
+                                        requestSnapshot ? 1 : 4 * 
snapshotIntervalMillis)));
+            }
+            previousTick = tick;
+        }
+    }
+
+    @Test
+    void testRequestUpdate() throws InterruptedException {
+        @Cleanup
+        DefaultMonotonicSnapshotClock clock =
+                new 
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), 
System::nanoTime);
+        long tick1 = clock.getTickNanos(false);
+        long tick2 = clock.getTickNanos(true);
+        assertThat(tick2).isGreaterThan(tick1);
+    }
+
+    @Test
+    void testRequestingSnapshotAfterClosed() throws InterruptedException {
+        DefaultMonotonicSnapshotClock clock =
+                new 
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), 
System::nanoTime);
+        clock.close();
+        long tick1 = clock.getTickNanos(true);
+        Thread.sleep(10);
+        long tick2 = clock.getTickNanos(true);
+        assertThat(tick2).isGreaterThan(tick1);
+    }
+
+    @Test
+    void testConstructorValidation() {
+        assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(0, 
System::nanoTime))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("snapshotIntervalNanos must be at least 1 
millisecond");
+        assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(-1, 
System::nanoTime))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("snapshotIntervalNanos must be at least 1 
millisecond");
+        assertThatThrownBy(() -> new 
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("clockSource must not be null");
+    }
+
+    @Test
+    void testFailureHandlingInClockSource() {
+        @Cleanup
+        DefaultMonotonicSnapshotClock clock =
+                new 
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), () -> {
+                    throw new RuntimeException("Test clock failure");
+                });
+        // the exception should be propagated
+        assertThatThrownBy(() -> clock.getTickNanos(true))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("Test clock failure");
+    }
+
+    @Test
+    void testLeapDetectionIndependently() {
+        AtomicLong clockValue = new AtomicLong(0);
+        AtomicLong tickValue = new AtomicLong(0);
+        long expectedTickValue = 0;
+        long snapshotIntervalNanos = TimeUnit.MILLISECONDS.toNanos(1);
+        DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater 
updater =
+                new 
DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater(clockValue::get,
 tickValue::set,
+                        snapshotIntervalNanos);
+
+        updater.update(true);
+
+        // advance the clock
+        clockValue.addAndGet(snapshotIntervalNanos);
+        expectedTickValue += snapshotIntervalNanos;
+        updater.update(true);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // simulate a leap backwards in time
+        clockValue.addAndGet(-10 * snapshotIntervalNanos);
+        expectedTickValue += snapshotIntervalNanos;
+        updater.update(true);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // advance the clock
+        clockValue.addAndGet(snapshotIntervalNanos);
+        expectedTickValue += snapshotIntervalNanos;
+        updater.update(true);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // simulate a leap backwards in time, without waiting a full snapshot 
interval
+        clockValue.addAndGet(-10 * snapshotIntervalNanos);
+        updater.update(false);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // advance the clock
+        clockValue.addAndGet(snapshotIntervalNanos);
+        expectedTickValue += snapshotIntervalNanos;
+        updater.update(true);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // simulate a small leap backwards in time which isn't detected, 
without waiting a full snapshot interval
+        clockValue.addAndGet(-1 * snapshotIntervalNanos);
+        updater.update(false);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+        // clock doesn't advance for one snapshot interval
+        clockValue.addAndGet(snapshotIntervalNanos);
+        updater.update(false);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+        // now the clock should advance again
+        clockValue.addAndGet(snapshotIntervalNanos);
+        expectedTickValue += snapshotIntervalNanos;
+        updater.update(false);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+        // simulate a leap forward
+        clockValue.addAndGet(10 * snapshotIntervalNanos);
+        // no special handling for leap forward
+        expectedTickValue += 10 * snapshotIntervalNanos;
+        updater.update(true);
+        assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 392ec0d3ff4..8343680f9bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
@@ -58,9 +59,10 @@ import org.testng.annotations.Test;
 @Slf4j
 @Test(groups = "flaky")
 public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
-    @BeforeClass
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        AsyncTokenBucket.switchToConsistentTokensView();
         super.internalSetup();
         this.prepareForOps();
 
@@ -91,6 +93,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
+        AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 2c44ba7e230..5c149d4e1e7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
@@ -73,7 +74,9 @@ public class PublishRateLimiterTest {
         when(transportCnx.getBrokerService()).thenReturn(brokerService);
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         when(brokerService.executor()).thenReturn(eventLoopGroup);
-        doReturn(null).when(eventLoopGroup).schedule(any(Runnable.class), 
anyLong(), any());
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(eventLoopGroup.next()).thenReturn(eventLoop);
+        doReturn(null).when(eventLoop).schedule(any(Runnable.class), 
anyLong(), any());
         incrementSeconds(1);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
new file mode 100644
index 00000000000..31c628b2bc4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+public abstract class AbstractMessageDispatchThrottlingTest extends 
ProducerConsumerBase {
+    public static <T> T[] merge(T[] first, T[] last) {
+        int totalLength = first.length + last.length;
+        T[] result = Arrays.copyOf(first, totalLength);
+        int offset = first.length;
+        System.arraycopy(last, 0, result, offset, first.length);
+        return result;
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        AsyncTokenBucket.switchToConsistentTokensView();
+        this.conf.setClusterName("test");
+        internalSetup();
+        producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void reset() throws Exception {
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+        for (String tenant : admin.tenants().getTenants()) {
+            for (String namespace : admin.namespaces().getNamespaces(tenant)) {
+                admin.namespaces().deleteNamespace(namespace, true);
+            }
+            admin.tenants().deleteTenant(tenant, true);
+        }
+
+        for (String cluster : admin.clusters().getClusters()) {
+            admin.clusters().deleteCluster(cluster);
+        }
+
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        producerBaseSetup();
+    }
+
+    @DataProvider(name = "subscriptions")
+    public Object[][] subscriptionsProvider() {
+        return new Object[][]{new Object[]{SubscriptionType.Shared}, 
{SubscriptionType.Exclusive}};
+    }
+
+    @DataProvider(name = "dispatchRateType")
+    public Object[][] dispatchRateProvider() {
+        return new Object[][]{{DispatchRateType.messageRate}, 
{DispatchRateType.byteRate}};
+    }
+
+    @DataProvider(name = "subscriptionAndDispatchRateType")
+    public Object[][] subDisTypeProvider() {
+        List<Object[]> mergeList = new LinkedList<>();
+        for (Object[] sub : subscriptionsProvider()) {
+            for (Object[] dispatch : dispatchRateProvider()) {
+                mergeList.add(AbstractMessageDispatchThrottlingTest.merge(sub, 
dispatch));
+            }
+        }
+        return mergeList.toArray(new Object[0][0]);
+    }
+
+    protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
+        Field statsUpdaterField = 
BrokerService.class.getDeclaredField("statsUpdater");
+        statsUpdaterField.setAccessible(true);
+        ScheduledExecutorService statsUpdater = (ScheduledExecutorService) 
statsUpdaterField
+                .get(pulsar.getBrokerService());
+        statsUpdater.shutdownNow();
+        ledger.getCursors().forEach(cursor -> {
+            ledger.deactivateCursor(cursor);
+        });
+    }
+
+    enum DispatchRateType {
+        messageRate, byteRate;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index a544c7e13bc..5d6f0c519ab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -27,15 +28,11 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
@@ -43,7 +40,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
-import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -52,93 +49,17 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
-import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.assertj.core.data.Offset;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
-public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
+@Test(groups = "broker-api")
+public class MessageDispatchThrottlingTest extends 
AbstractMessageDispatchThrottlingTest {
     private static final Logger log = 
LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        AsyncTokenBucket.switchToConsistentTokensView();
-        this.conf.setClusterName("test");
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-        AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    protected void reset() throws Exception {
-        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
-        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
-
-        for (String tenant : admin.tenants().getTenants()) {
-            for (String namespace : admin.namespaces().getNamespaces(tenant)) {
-                admin.namespaces().deleteNamespace(namespace, true);
-            }
-            admin.tenants().deleteTenant(tenant, true);
-        }
-
-        for (String cluster : admin.clusters().getClusters()) {
-            admin.clusters().deleteCluster(cluster);
-        }
-
-        pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
-        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
-
-        super.producerBaseSetup();
-    }
-
-
-    @DataProvider(name = "subscriptions")
-    public Object[][] subscriptionsProvider() {
-        return new Object[][] { new Object[] { SubscriptionType.Shared }, { 
SubscriptionType.Exclusive } };
-    }
-
-    @DataProvider(name = "dispatchRateType")
-    public Object[][] dispatchRateProvider() {
-        return new Object[][] { { DispatchRateType.messageRate }, { 
DispatchRateType.byteRate } };
-    }
-
-    @DataProvider(name = "subscriptionAndDispatchRateType")
-    public Object[][] subDisTypeProvider() {
-        List<Object[]> mergeList = new LinkedList<>();
-        for (Object[] sub : subscriptionsProvider()) {
-            for (Object[] dispatch : dispatchRateProvider()) {
-                mergeList.add(merge(sub, dispatch));
-            }
-        }
-        return mergeList.toArray(new Object[0][0]);
-    }
-
-    public static <T> T[] merge(T[] first, T[] last) {
-        int totalLength = first.length + last.length;
-        T[] result = Arrays.copyOf(first, totalLength);
-        int offset = first.length;
-        System.arraycopy(last, 0, result, offset, first.length);
-        return result;
-    }
-
-    enum DispatchRateType {
-        messageRate, byteRate;
-    }
-
     /**
      * verifies: message-rate change gets reflected immediately into topic at 
runtime
      *
@@ -150,7 +71,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
 
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
 
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
@@ -220,7 +141,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     @SuppressWarnings("deprecation")
     @Test
     public void testSystemTopicDeliveryNonBlock() throws Exception {
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         final String topicName = "persistent://" + namespace + "/" + 
UUID.randomUUID().toString().replaceAll("-", "");
         admin.topics().createNonPartitionedTopic(topicName);
@@ -264,7 +185,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
             DispatchRateType dispatchRateType) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
 
         final int messageRate = 100;
@@ -332,7 +253,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testClusterMsgByteRateLimitingClusterConfig() throws Exception 
{
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
         final int messageRate = 5;
         final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to 
be delivered
@@ -407,7 +328,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
             throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingAll";
 
         final int messageRate = 10;
@@ -475,7 +396,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingAll";
         final String subscriptionName = "my-subscriber-name";
 
@@ -528,8 +449,9 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testRateLimitingMultipleConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingMultipleConsumers";
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
 
         final int messageRate = 5;
         DispatchRate dispatchRate = DispatchRate.builder()
@@ -540,7 +462,8 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().enableBatching(false).topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
 
         Awaitility.await()
@@ -566,10 +489,15 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
                         throw new RuntimeException(e);
                     }
                 });
+        @Cleanup
         Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer5 = consumerBuilder.subscribe();
 
         // deactive cursors
@@ -585,15 +513,10 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         Thread.sleep(1000);
 
         // rate limiter should have limited messages with at least 10% 
accuracy (or 2 messages if messageRate is low)
-        Assert.assertEquals(totalReceived.get(), messageRate, 
Math.max(messageRate / 10, 2));
+        assertThat(totalReceived.get()).isCloseTo(messageRate, 
Offset.offset(Math.max(messageRate / 10, 2)));
 
-        consumer1.close();
-        consumer2.close();
-        consumer3.close();
-        consumer4.close();
-        consumer5.close();
-        producer.close();
         log.info("-- Exiting {} test --", methodName);
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
     }
 
     @Test
@@ -602,7 +525,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
 
         conf.setDispatchThrottlingOnBatchMessageEnabled(true);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingMultipleConsumers";
 
         final int messageRate = 5;
@@ -614,6 +537,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         final int messagesPerBatch = 100;
         final int numProducedMessages = messageRate * messagesPerBatch;
         // create producer and topic
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(true)
                 .batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).batchingMaxMessages(messagesPerBatch).create();
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -634,10 +558,15 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
                     log.debug("Received message [{}] in the listener", 
receivedMessage);
                     totalReceived.incrementAndGet();
                 });
+        @Cleanup
         Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
+        @Cleanup
         Consumer<byte[]> consumer5 = consumerBuilder.subscribe();
 
         // deactive cursors
@@ -657,12 +586,6 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         // consumer should not have received all published message due to 
message-rate throttling
         Assert.assertEquals(totalReceived.get(), numProducedMessages);
 
-        consumer1.close();
-        consumer2.close();
-        consumer3.close();
-        consumer4.close();
-        consumer5.close();
-        producer.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -670,7 +593,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testClusterRateLimitingConfiguration(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
         final int messageRate = 5;
 
@@ -688,12 +611,14 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
 
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         int numMessages = 500;
 
         final AtomicInteger totalReceived = new AtomicInteger(0);
 
+        @Cleanup
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
                 .subscriptionType(subscription).messageListener((c1, msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
@@ -716,8 +641,6 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         // consumer should not have received all published message due to 
message-rate throttling
         Assert.assertNotEquals(totalReceived.get(), numMessages);
 
-        consumer.close();
-        producer.close();
         
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
                 Integer.toString(initValue));
         log.info("-- Exiting {} test --", methodName);
@@ -733,7 +656,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testMessageByteRateThrottlingCombined(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingAll";
 
         final int messageRate = 5; // 5 msgs per second
@@ -803,7 +726,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testGlobalNamespaceThrottling() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
 
         final int messageRate = 5;
@@ -869,7 +792,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/throttlingBlock";
 
         final int messageRate = 10;
@@ -948,7 +871,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testClusterPolicyOverrideConfiguration() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName1 = "persistent://" + namespace + 
"/throttlingOverride1";
         final String topicName2 = "persistent://" + namespace + 
"/throttlingOverride2";
         final int clusterMessageRate = 100;
@@ -1018,7 +941,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testClosingRateLimiter(SubscriptionType subscription) throws 
Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/closingRateLimiter" + subscription.name();
         final String subName = "mySubscription" + subscription.name();
 
@@ -1066,7 +989,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     @SuppressWarnings("deprecation")
     @Test
     public void testDispatchRateCompatibility2() throws Exception {
-        final String namespace = "my-property/dispatch-rate-compatibility";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/dispatch-rate-compatibility");
         final String topicName = "persistent://" + namespace + "/t1";
         final String cluster = "test";
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster));
@@ -1112,17 +1035,6 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         topic.close().get();
     }
 
-    protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
-        Field statsUpdaterField = 
BrokerService.class.getDeclaredField("statsUpdater");
-        statsUpdaterField.setAccessible(true);
-        ScheduledExecutorService statsUpdater = (ScheduledExecutorService) 
statsUpdaterField
-                .get(pulsar.getBrokerService());
-        statsUpdater.shutdownNow();
-        ledger.getCursors().forEach(cursor -> {
-            ledger.deactivateCursor(cursor);
-        });
-    }
-
     /**
      * It verifies that relative throttling at least dispatch messages as 
publish-rate.
      *
@@ -1133,7 +1045,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
     public void testRelativeMessageRateLimitingThrottling(SubscriptionType 
subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/relative_throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/relative_throttling_ns");
         final String topicName = "persistent://" + namespace + 
"/relative-throttle" + subscription;
 
         final int messageRate = 1;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 02de11a2bcc..8847cd176bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import com.google.common.collect.Sets;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +32,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
@@ -37,8 +40,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
-public class SubscriptionMessageDispatchThrottlingTest extends 
MessageDispatchThrottlingTest {
+@Test(groups = "broker-api")
+public class SubscriptionMessageDispatchThrottlingTest extends 
AbstractMessageDispatchThrottlingTest {
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
 
     /**
@@ -241,7 +244,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         admin.namespaces().setSubscriptionDispatchRate(namespace, 
subscriptionDispatchRate);
         admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
         long initBytes = 
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
brokerRate);
+        updateBrokerDispatchThrottlingRateInBytes(brokerRate);
 
         final int numProducedMessages = 30;
         final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -272,10 +275,11 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
         final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
-        Awaitility.await().untilAsserted(() -> {
+        Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
             DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
-            Assert.assertTrue(brokerDispatchRateLimiter != null
-                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            assertThat(brokerDispatchRateLimiter)
+                    .isNotNull()
+                    .satisfies(l -> 
assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate));
             DispatchRateLimiter topicDispatchRateLimiter = 
topic.getDispatchRateLimiter().orElse(null);
             Assert.assertTrue(topicDispatchRateLimiter != null
                     && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -301,10 +305,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         consumer.close();
         producer.close();
 
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", 
Long.toString(initBytes));
-
-        admin.topics().delete(topicName, true);
-        admin.namespaces().deleteNamespace(namespace);
+        updateBrokerDispatchThrottlingRateInBytes(initBytes);
     }
 
     /**
@@ -401,7 +402,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
     private void testDispatchRate(SubscriptionType subscription,
                                   int brokerRate, int topicRate, int subRate, 
int expectRate) throws Exception {
 
-        final String namespace = "my-property/throttling_ns";
+        final String namespace = 
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/throttlingAll");
         final String subName = "my-subscriber-name-" + subscription;
 
@@ -419,7 +420,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         admin.namespaces().setSubscriptionDispatchRate(namespace, 
subscriptionDispatchRate);
         admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
         long initBytes = 
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
brokerRate);
+        updateBrokerDispatchThrottlingRateInBytes(brokerRate);
 
         final int numProducedMessages = 30;
         final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -450,10 +451,11 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
         final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
-        Awaitility.await().untilAsserted(() -> {
+        Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
             DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
-            Assert.assertTrue(brokerDispatchRateLimiter != null
-                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            assertThat(brokerDispatchRateLimiter)
+                    .isNotNull()
+                    .satisfies(l -> 
assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate));
             DispatchRateLimiter topicDispatchRateLimiter = 
topic.getDispatchRateLimiter().orElse(null);
             Assert.assertTrue(topicDispatchRateLimiter != null
                     && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -482,9 +484,18 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
 
         consumer.close();
         producer.close();
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", 
Long.toString(initBytes));
-        admin.topics().delete(topicName, true);
-        admin.namespaces().deleteNamespace(namespace);
+        updateBrokerDispatchThrottlingRateInBytes(initBytes);
+    }
+
+    private void updateBrokerDispatchThrottlingRateInBytes(long bytes) throws 
PulsarAdminException {
+        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", 
Long.toString(bytes));
+        long expectedBytes = bytes > 0L ? bytes : -1L;
+        await().untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            assertThat(brokerDispatchRateLimiter)
+                    .isNotNull()
+                    .satisfies(l -> 
assertThat(l.getDispatchRateOnByte()).isEqualTo(expectedBytes));
+        });
     }
 
     /**
@@ -537,7 +548,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
 
         long initBytes = 
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         final int byteRate = 1000;
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + 
byteRate);
+        updateBrokerDispatchThrottlingRateInBytes(byteRate);
 
         Awaitility.await().untilAsserted(() -> {
             
Assert.assertEquals(pulsar.getConfiguration().getDispatchThrottlingRateInByte(),
 byteRate);
@@ -576,12 +587,6 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
 
-        Awaitility.await().untilAsserted(() -> {
-            DispatchRateLimiter rateLimiter = 
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
-            Assert.assertTrue(rateLimiter != null
-                    && rateLimiter.getDispatchRateOnByte() > 0);
-        });
-
         long start = System.currentTimeMillis();
         // Asynchronously produce messages
         for (int i = 0; i < numProducedMessagesEachTopic; i++) {
@@ -600,7 +605,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         consumer2.close();
         producer1.close();
         producer2.close();
-        
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", 
Long.toString(initBytes));
+        updateBrokerDispatchThrottlingRateInBytes(initBytes);
         log.info("-- Exiting {} test --", methodName);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
index 1c0ae5547d5..a848d68f37f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-@Test
+@Test(groups = "broker-api")
 public class MessagePublishThrottlingTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(MessagePublishThrottlingTest.class);
 

Reply via email to