This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 06c6adf6faa83c4fbaef95ff792299c8c0744817 Author: Daniel Sinai <[email protected]> AuthorDate: Wed Jul 28 10:32:03 2021 +0300 [issue #13351] Solving precise rate limiting does not takes effect (#11446)  befoe this PR precise publish rate limiting wasn't taking effect at all ### Modifications In order to solve the current problems, there are 2 modifications 1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in order to starve the producer) 2. Checking if there are available permits before resetting the read from the connection again ### Verifying this change Already covered by current tests. (cherry picked from commit 00ad07d7fdad5dadc378235a2f5e7edd354d8ff7) --- .../broker/service/PrecisPublishLimiter.java | 4 +- .../broker/service/PublishRateLimiterDisable.java | 2 +- .../broker/service/PublishRateLimiterTest.java | 65 +++++++++++++++++++--- .../org/apache/pulsar/common/util/RateLimiter.java | 29 +++++++--- 4 files changed, 83 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 4db6bf2..e981518 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -79,10 +79,10 @@ public class PrecisPublishLimiter implements PublishRateLimiter { this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 0ff3866..c72f6ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -59,7 +59,7 @@ public class PublishRateLimiterDisable implements PublishRateLimiter { @Override public boolean tryAcquire(int numbers, long bytes) { // No-op - return false; + return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index b820c60..9131d51 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,10 +20,19 @@ package org.apache.pulsar.broker.service; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.utils.StatsOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -38,13 +47,12 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, - () -> System.out.print("Refresh permit")); + + precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -94,19 +102,62 @@ public class PublishRateLimiterTest { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + precisPublishLimiter.tryAcquire(1, 10); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + ScheduledFuture<?> onMessageRenewTask = (ScheduledFuture<?>) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + ScheduledFuture<?> onByteRenewTask = (ScheduledFuture<?>) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + + // renewing the permits from previous tests + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(1, 10)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded assertFalse(precisPublishLimiter.tryAcquire(11, 100)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + + // tryAcquire exceeded exactly + assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(10, 100)); + assertTrue(precisPublishLimiter.tryAcquire(9, 99)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index bd31853..edaef17 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier<Long> permitUpdater; private RateLimitFunction rateLimitFunction; - private boolean isDispatchRateLimiter; + private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); + } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, null, false); + this.rateLimitFunction = autoReadResetFunction; + } + + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, + RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); this.rateLimitFunction = autoReadResetFunction; } @@ -78,7 +88,7 @@ public class RateLimiter implements AutoCloseable{ } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) { + final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -86,7 +96,7 @@ public class RateLimiter implements AutoCloseable{ this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; - this.isDispatchRateLimiter = isDispatchRateLimiter; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; if (service != null) { this.executorService = service; @@ -180,9 +190,13 @@ public class RateLimiter implements AutoCloseable{ } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (isDispatchRateLimiter) { + if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate if (acquirePermit > this.permits) { @@ -257,14 +271,15 @@ public class RateLimiter implements AutoCloseable{ } synchronized void renew() { - acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } - if (rateLimitFunction != null) { + // release the back-pressure by applying the rateLimitFunction only when there are available permits + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } notifyAll();
