This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit efa28d74a451ea1d2215f838aa88ae19de06a774 Author: Yan Zhao <[email protected]> AuthorDate: Mon Apr 25 22:17:52 2022 +0800 [fix] [broker] Fix problem at RateLimiter#tryAcquire (#15306) (cherry picked from commit 84b65598481fd9bbb6e06e2deb335222a04b9c6b) --- .../org/apache/pulsar/common/util/RateLimiter.java | 3 +-- .../apache/pulsar/common/util/RateLimiterTest.java | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 20ca181c400..8f02bcc0e5c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -189,8 +189,7 @@ public class RateLimiter implements AutoCloseable{ canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate - if (acquirePermit > this.permits) { - acquiredPermits = this.permits; + if (acquirePermit + acquiredPermits > this.permits) { return false; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java index 788ab749390..57090fcc7b7 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -133,6 +133,24 @@ public class RateLimiterTest { rate.close(); } + @Test + public void testTryAcquireMoreThanPermits() { + final long rateTimeMSec = 1000; + RateLimiter rate = RateLimiter.builder().permits(3).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + assertTrue(rate.tryAcquire(2)); + assertEquals(rate.getAvailablePermits(), 1); + + //try to acquire failed, not decrease availablePermits. + assertFalse(rate.tryAcquire(2)); + assertEquals(rate.getAvailablePermits(), 1); + + assertTrue(rate.tryAcquire(1)); + assertEquals(rate.getAvailablePermits(), 0); + + rate.close(); + } + @Test public void testMultipleTryAcquire() { final long rateTimeMSec = 1000; @@ -189,7 +207,7 @@ public class RateLimiterTest { Thread.sleep(rateTimeMSec); // check after three rate-time: acquiredPermits is 0 - assertEquals(rate.getAvailablePermits() > 0, true); + assertTrue(rate.getAvailablePermits() > 0); rate.close(); }
