This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02fc06e  [Issue 8599] Fix DispatchRateLimiter does not take effect 
(#8611)
02fc06e is described below

commit 02fc06e78583aa763772490650faabd4fc220e6c
Author: WangJialing <[email protected]>
AuthorDate: Sun May 23 10:24:55 2021 +0800

    [Issue 8599] Fix DispatchRateLimiter does not take effect (#8611)
    
    Fixes #8599
    Fixes #4777
    
    ### Motivation
    
    Pulsar current support topic level and subscription level dispatch rate 
limiter by using `DispatchRateLimiter`.  When consumers connected to broker and 
start reading entry, broker judge whether rate limit is exceeded before 
reading, and increasing the permits after reading finished by call 
tryAcquire().  When there are multi consumers using one `DispatchRateLimiter`, 
these consumers could start reading together and may increasing the 
`acquiredPermits` far more than `permits` after rea [...]
    
    ### Modifications
    
    This PR change the behaviour of `DispatchRateLimiter`, minus `permits` 
every second instead of reset `acquiredPermits` to 0, and the reading will stop 
for a while until `acquiredPermits` return to a value less than  `permits` .
    
    ### Verifying this change
    RateLimiterTest.testDispatchRate()
---
 .../service/persistent/DispatchRateLimiter.java    |  4 +--
 .../SubscriptionMessageDispatchThrottlingTest.java |  6 ++---
 .../org/apache/pulsar/common/util/RateLimiter.java | 30 ++++++++++++++++------
 .../apache/pulsar/common/util/RateLimiterTest.java | 21 +++++++++++++++
 4 files changed, 48 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 6addacf..a3333cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -342,7 +342,7 @@ public class DispatchRateLimiter {
         if (msgRate > 0) {
             if (this.dispatchRateLimiterOnMessage == null) {
                 this.dispatchRateLimiterOnMessage = new 
RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
-                        ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
+                        ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
             } else {
                 this.dispatchRateLimiterOnMessage.setRate(msgRate, 
dispatchRate.ratePeriodInSecond,
                         TimeUnit.SECONDS, permitUpdaterMsg);
@@ -362,7 +362,7 @@ public class DispatchRateLimiter {
         if (byteRate > 0) {
             if (this.dispatchRateLimiterOnByte == null) {
                 this.dispatchRateLimiterOnByte = new 
RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
-                        ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
+                        ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
             } else {
                 this.dispatchRateLimiterOnByte.setRate(byteRate, 
dispatchRate.ratePeriodInSecond,
                         TimeUnit.SECONDS, permitUpdaterByte);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index f6c563d..8a7e067 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -211,8 +211,8 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
      * verify rate-limiting should throttle message-dispatching based on 
byte-rate
      *
      * <pre>
-     *  1. dispatch-byte-rate = 100 bytes/sec
-     *  2. send 30 msgs : each with 10 byte
+     *  1. dispatch-byte-rate = 1000 bytes/sec
+     *  2. send 30 msgs : each with 100 byte
      *  3. it should take up to 2 second to receive all messages
      * </pre>
      *
@@ -227,7 +227,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/throttlingAll");
         final String subName = "my-subscriber-name-" + subscription;
 
-        final int byteRate = 100;
+        final int byteRate = 1000;
         DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, 
dispatchRate);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 6a8afd2..bd31853 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -60,6 +60,7 @@ public class RateLimiter implements AutoCloseable{
     // permitUpdate helps to update permit-rate at runtime
     private Supplier<Long> permitUpdater;
     private RateLimitFunction rateLimitFunction;
+    private boolean isDispatchRateLimiter;
 
     public RateLimiter(final long permits, final long rateTime, final TimeUnit 
timeUnit) {
         this(null, permits, rateTime, timeUnit, null);
@@ -72,7 +73,12 @@ public class RateLimiter implements AutoCloseable{
     }
 
     public RateLimiter(final ScheduledExecutorService service, final long 
permits, final long rateTime,
-            final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
+                       final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
+        this(service, permits, rateTime, timeUnit, permitUpdater, false);
+    }
+
+    public RateLimiter(final ScheduledExecutorService service, final long 
permits, final long rateTime,
+            final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean 
isDispatchRateLimiter) {
         checkArgument(permits > 0, "rate must be > 0");
         checkArgument(rateTime > 0, "Renew permit time must be > 0");
 
@@ -80,6 +86,7 @@ public class RateLimiter implements AutoCloseable{
         this.timeUnit = timeUnit;
         this.permits = permits;
         this.permitUpdater = permitUpdater;
+        this.isDispatchRateLimiter = isDispatchRateLimiter;
 
         if (service != null) {
             this.executorService = service;
@@ -172,15 +179,22 @@ public class RateLimiter implements AutoCloseable{
             renewTask = createTask();
         }
 
-        // acquired-permits can't be larger than the rate
-        if (acquirePermit > this.permits) {
-            acquiredPermits = this.permits;
-            return false;
-        }
         boolean canAcquire = acquirePermit < 0 || acquiredPermits < 
this.permits;
-        if (canAcquire) {
+        if (isDispatchRateLimiter) {
+            // for dispatch rate limiter just add acquirePermit
             acquiredPermits += acquirePermit;
+        } else {
+            // acquired-permits can't be larger than the rate
+            if (acquirePermit > this.permits) {
+                acquiredPermits = this.permits;
+                return false;
+            }
+
+            if (canAcquire) {
+                acquiredPermits += acquirePermit;
+            }
         }
+
         return canAcquire;
     }
 
@@ -243,7 +257,7 @@ public class RateLimiter implements AutoCloseable{
     }
 
     synchronized void renew() {
-        acquiredPermits = 0;
+        acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits 
- permits) : 0;
         if (permitUpdater != null) {
             long newPermitRate = permitUpdater.get();
             if (newPermitRate > 0) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 891609d..61336f4 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -165,6 +165,27 @@ public class RateLimiterTest {
     }
 
     @Test
+    public void testDispatchRate() throws Exception {
+        final long rateTimeMSec = 1000;
+        final int permits = 100;
+        RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, 
TimeUnit.MILLISECONDS, null, true);
+        rate.tryAcquire(100);
+        rate.tryAcquire(100);
+        rate.tryAcquire(100);
+        assertEquals(rate.getAvailablePermits(), 0);
+
+        Thread.sleep(rateTimeMSec * 2);
+        // check after two rate-time: acquiredPermits is 100
+        assertEquals(rate.getAvailablePermits(), 0);
+
+        Thread.sleep(rateTimeMSec);
+        // check after three rate-time: acquiredPermits is 0
+        assertEquals(rate.getAvailablePermits() > 0, true);
+
+        rate.close();
+    }
+
+    @Test
     public void testRateLimiterWithPermitUpdater() throws Exception {
         long permits = 10;
         long rateTime = 1;

Reply via email to