This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06c6adf6faa83c4fbaef95ff792299c8c0744817
Author: Daniel Sinai <[email protected]>
AuthorDate: Wed Jul 28 10:32:03 2021 +0300

    [issue #13351] Solving precise rate limiting does not takes effect (#11446)
    
    
![image](https://user-images.githubusercontent.com/51213812/126812923-91bb827c-246d-451d-8f25-343bb2c1dca0.png)
    
    befoe this PR precise publish rate limiting wasn't taking effect at all
    ### Modifications
    
    In order to solve the current problems, there are 2 modifications
    
    1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in 
order to starve the producer)
    2. Checking if there are available permits before resetting the read from 
the connection again
    
    ### Verifying this change
    
    Already covered by current tests.
    
    (cherry picked from commit 00ad07d7fdad5dadc378235a2f5e7edd354d8ff7)
---
 .../broker/service/PrecisPublishLimiter.java       |  4 +-
 .../broker/service/PublishRateLimiterDisable.java  |  2 +-
 .../broker/service/PublishRateLimiterTest.java     | 65 +++++++++++++++++++---
 .../org/apache/pulsar/common/util/RateLimiter.java | 29 +++++++---
 4 files changed, 83 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
index 4db6bf2..e981518 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -79,10 +79,10 @@ public class PrecisPublishLimiter implements 
PublishRateLimiter {
             this.publishMaxByteRate = 
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
             if (this.publishMaxMessageRate > 0) {
                 topicPublishRateLimiterOnMessage =
-                        new RateLimiter(publishMaxMessageRate, 1, 
TimeUnit.SECONDS, rateLimitFunction);
+                        new RateLimiter(publishMaxMessageRate, 1, 
TimeUnit.SECONDS, rateLimitFunction, true);
             }
             if (this.publishMaxByteRate > 0) {
-                topicPublishRateLimiterOnByte = new 
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
+                topicPublishRateLimiterOnByte = new 
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
             }
         } else {
             this.publishMaxMessageRate = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
index 0ff3866..c72f6ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
@@ -59,7 +59,7 @@ public class PublishRateLimiterDisable implements 
PublishRateLimiter {
     @Override
     public boolean tryAcquire(int numbers, long bytes) {
         // No-op
-        return false;
+        return true;
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index b820c60..9131d51 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -20,10 +20,19 @@ package org.apache.pulsar.broker.service;
 
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.stats.Rate;
+import org.apache.pulsar.common.util.RateLimitFunction;
+import org.apache.pulsar.common.util.RateLimiter;
+import org.apache.pulsar.utils.StatsOutputStream;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -38,13 +47,12 @@ public class PublishRateLimiterTest {
     private PrecisPublishLimiter precisPublishLimiter;
     private PublishRateLimiterImpl publishRateLimiter;
 
-
     @BeforeMethod
     public void setup() throws Exception {
         policies.publishMaxMessageRate = new HashMap<>();
         policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
-        precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME,
-                () -> System.out.print("Refresh permit"));
+
+        precisPublishLimiter = new PrecisPublishLimiter(policies, 
CLUSTER_NAME, () -> System.out.print("Refresh permit"));
         publishRateLimiter = new PublishRateLimiterImpl(policies, 
CLUSTER_NAME);
     }
 
@@ -94,19 +102,62 @@ public class PublishRateLimiterTest {
 
     @Test
     public void testPrecisePublishRateLimiterAcquire() throws Exception {
+        Class precisPublishLimiterClass = 
Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter");
+        Field topicPublishRateLimiterOnMessageField = 
precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage");
+        Field topicPublishRateLimiterOnByteField = 
precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte");
+        topicPublishRateLimiterOnMessageField.setAccessible(true);
+        topicPublishRateLimiterOnByteField.setAccessible(true);
+
+        RateLimiter topicPublishRateLimiterOnMessage = 
(RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter);
+        RateLimiter topicPublishRateLimiterOnByte = 
(RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter);
+
+        Method renewTopicPublishRateLimiterOnMessageMethod = 
topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null);
+        Method renewTopicPublishRateLimiterOnByteMethod = 
topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null);
+        renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true);
+        renewTopicPublishRateLimiterOnByteMethod.setAccessible(true);
+
+        // running tryAcquire in order to lazyInit the renewTask
+        precisPublishLimiter.tryAcquire(1, 10);
+
+        Field onMessageRenewTaskField = 
topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask");
+        Field onByteRenewTaskField = 
topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask");
+        onMessageRenewTaskField.setAccessible(true);
+        onByteRenewTaskField.setAccessible(true);
+        ScheduledFuture<?> onMessageRenewTask = (ScheduledFuture<?>) 
onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage);
+        ScheduledFuture<?> onByteRenewTask = (ScheduledFuture<?>) 
onByteRenewTaskField.get(topicPublishRateLimiterOnByte);
+
+        onMessageRenewTask.cancel(false);
+        onByteRenewTask.cancel(false);
+
+        // renewing the permits from previous tests
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
+
         // tryAcquire not exceeded
         assertTrue(precisPublishLimiter.tryAcquire(1, 10));
-        Thread.sleep(1100);
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
 
         // tryAcquire numOfMessages exceeded
         assertFalse(precisPublishLimiter.tryAcquire(11, 100));
-        Thread.sleep(1100);
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
 
         // tryAcquire msgSizeInBytes exceeded
         assertFalse(precisPublishLimiter.tryAcquire(10, 101));
-        Thread.sleep(1100);
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
+
+        // tryAcquire exceeded exactly
+        assertFalse(precisPublishLimiter.tryAcquire(10, 100));
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
+        
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
+        
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
 
         // tryAcquire not exceeded
-        assertTrue(precisPublishLimiter.tryAcquire(10, 100));
+        assertTrue(precisPublishLimiter.tryAcquire(9, 99));
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index bd31853..edaef17 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{
     // permitUpdate helps to update permit-rate at runtime
     private Supplier<Long> permitUpdater;
     private RateLimitFunction rateLimitFunction;
-    private boolean isDispatchRateLimiter;
+    private boolean isDispatchOrPrecisePublishRateLimiter;
 
     public RateLimiter(final long permits, final long rateTime, final TimeUnit 
timeUnit) {
         this(null, permits, rateTime, timeUnit, null);
     }
 
+    public RateLimiter(final long permits, final long rateTime, final TimeUnit 
timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
+        this(null, permits, rateTime, timeUnit, null, 
isDispatchOrPrecisePublishRateLimiter);
+    }
+
     public RateLimiter(final long permits, final long rateTime, final TimeUnit 
timeUnit,
                        RateLimitFunction autoReadResetFunction) {
-        this(null, permits, rateTime, timeUnit, null);
+        this(null, permits, rateTime, timeUnit, null, false);
+        this.rateLimitFunction = autoReadResetFunction;
+    }
+
+    public RateLimiter(final long permits, final long rateTime, final TimeUnit 
timeUnit,
+                       RateLimitFunction autoReadResetFunction, boolean 
isDispatchOrPrecisePublishRateLimiter) {
+        this(null, permits, rateTime, timeUnit, null, 
isDispatchOrPrecisePublishRateLimiter);
         this.rateLimitFunction = autoReadResetFunction;
     }
 
@@ -78,7 +88,7 @@ public class RateLimiter implements AutoCloseable{
     }
 
     public RateLimiter(final ScheduledExecutorService service, final long 
permits, final long rateTime,
-            final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean 
isDispatchRateLimiter) {
+            final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean 
isDispatchOrPrecisePublishRateLimiter) {
         checkArgument(permits > 0, "rate must be > 0");
         checkArgument(rateTime > 0, "Renew permit time must be > 0");
 
@@ -86,7 +96,7 @@ public class RateLimiter implements AutoCloseable{
         this.timeUnit = timeUnit;
         this.permits = permits;
         this.permitUpdater = permitUpdater;
-        this.isDispatchRateLimiter = isDispatchRateLimiter;
+        this.isDispatchOrPrecisePublishRateLimiter = 
isDispatchOrPrecisePublishRateLimiter;
 
         if (service != null) {
             this.executorService = service;
@@ -180,9 +190,13 @@ public class RateLimiter implements AutoCloseable{
         }
 
         boolean canAcquire = acquirePermit < 0 || acquiredPermits < 
this.permits;
-        if (isDispatchRateLimiter) {
+        if (isDispatchOrPrecisePublishRateLimiter) {
             // for dispatch rate limiter just add acquirePermit
             acquiredPermits += acquirePermit;
+
+            // we want to back-pressure from the current state of the 
rateLimiter therefore we should check if there
+            // are any available premits again
+            canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
         } else {
             // acquired-permits can't be larger than the rate
             if (acquirePermit > this.permits) {
@@ -257,14 +271,15 @@ public class RateLimiter implements AutoCloseable{
     }
 
     synchronized void renew() {
-        acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits 
- permits) : 0;
+        acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, 
acquiredPermits - permits) : 0;
         if (permitUpdater != null) {
             long newPermitRate = permitUpdater.get();
             if (newPermitRate > 0) {
                 setRate(newPermitRate);
             }
         }
-        if (rateLimitFunction != null) {
+        // release the back-pressure by applying the rateLimitFunction only 
when there are available permits
+        if (rateLimitFunction != null && this.getAvailablePermits() > 0) {
             rateLimitFunction.apply();
         }
         notifyAll();

Reply via email to