This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5a66d6cc7bb9920a3025b47e71bf4faeb57529e3 Author: Kim, Joo Hyuk <[email protected]> AuthorDate: Mon May 15 16:29:50 2023 +0900 [fix][broker] Fix class name typo `PrecisPublishLimiter` to "Precise" (#20310) --- .../pulsar/broker/service/AbstractTopic.java | 2 +- ...lishLimiter.java => PrecisePublishLimiter.java} | 10 +++---- .../PrecisTopicPublishRateThrottleTest.java | 2 +- ...terTest.java => PrecisePublishLimiterTest.java} | 30 +++++++++---------- .../broker/service/PublishRateLimiterTest.java | 34 ++++++++++++---------- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 8a76f615189..fa27f87f99d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1238,7 +1238,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { // create new rateLimiter if rate-limiter is disabled if (preciseTopicPublishRateLimitingEnable) { - this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, + this.topicPublishRateLimiter = new PrecisePublishLimiter(publishRate, () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); } else { this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java similarity index 93% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java index 67cc46d95fa..55e24ab158e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java @@ -24,7 +24,7 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimitFunction; import org.apache.pulsar.common.util.RateLimiter; -public class PrecisPublishLimiter implements PublishRateLimiter { +public class PrecisePublishLimiter implements PublishRateLimiter { protected volatile int publishMaxMessageRate = 0; protected volatile long publishMaxByteRate = 0; // precise mode for publish rate limiter @@ -33,18 +33,18 @@ public class PrecisPublishLimiter implements PublishRateLimiter { private final RateLimitFunction rateLimitFunction; private final ScheduledExecutorService scheduledExecutorService; - public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { + public PrecisePublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { this.rateLimitFunction = rateLimitFunction; update(policies, clusterName); this.scheduledExecutorService = null; } - public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + public PrecisePublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { this(publishRate, rateLimitFunction, null); } - public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, - ScheduledExecutorService scheduledExecutorService) { + public PrecisePublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, + ScheduledExecutorService scheduledExecutorService) { this.rateLimitFunction = rateLimitFunction; update(publishRate); this.scheduledExecutorService = scheduledExecutorService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java index 7a7cc091b3f..06c32a58b8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java @@ -164,7 +164,7 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{ "" + rateInMsg)); Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); Assert.assertNotNull(topicRef); - PrecisPublishLimiter limiter = ((PrecisPublishLimiter) ((AbstractTopic) topicRef).topicPublishRateLimiter); + PrecisePublishLimiter limiter = ((PrecisePublishLimiter) ((AbstractTopic) topicRef).topicPublishRateLimiter); Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg)); Assert.assertEquals(limiter.publishMaxByteRate, 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java similarity index 57% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java index 61804e7255d..2becd5d7e10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java @@ -23,35 +23,35 @@ import static org.testng.Assert.assertTrue; import org.apache.pulsar.common.policies.data.PublishRate; import org.testng.annotations.Test; -public class PrecisPublishLimiterTest { +public class PrecisePublishLimiterTest { @Test void shouldResetMsgLimitAfterUpdate() { - PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> { }); - precisPublishLimiter.update(new PublishRate(1, 1)); - assertFalse(precisPublishLimiter.tryAcquire(99, 99)); - precisPublishLimiter.update(new PublishRate(-1, 100)); - assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + precisePublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisePublishLimiter.tryAcquire(99, 99)); + precisePublishLimiter.update(new PublishRate(-1, 100)); + assertTrue(precisePublishLimiter.tryAcquire(99, 99)); } @Test void shouldResetBytesLimitAfterUpdate() { - PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> { }); - precisPublishLimiter.update(new PublishRate(1, 1)); - assertFalse(precisPublishLimiter.tryAcquire(99, 99)); - precisPublishLimiter.update(new PublishRate(100, -1)); - assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + precisePublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisePublishLimiter.tryAcquire(99, 99)); + precisePublishLimiter.update(new PublishRate(100, -1)); + assertTrue(precisePublishLimiter.tryAcquire(99, 99)); } @Test void shouldCloseResources() throws Exception { for (int i = 0; i < 20000; i++) { - PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> { + PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(100, 100), () -> { }); - precisPublishLimiter.tryAcquire(99, 99); - precisPublishLimiter.close(); + precisePublishLimiter.tryAcquire(99, 99); + precisePublishLimiter.close(); } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index ae65ee71a18..ea68382b49f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -39,7 +39,7 @@ public class PublishRateLimiterTest { private final PublishRate publishRate = new PublishRate(10, 100); private final PublishRate newPublishRate = new PublishRate(20, 200); - private PrecisPublishLimiter precisPublishLimiter; + private PrecisePublishLimiter precisePublishLimiter; private PublishRateLimiterImpl publishRateLimiter; @BeforeMethod @@ -47,7 +47,7 @@ public class PublishRateLimiterTest { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); + precisePublishLimiter = new PrecisePublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -88,23 +88,25 @@ public class PublishRateLimiterTest { @Test public void testPrecisePublishRateLimiterUpdate() { - assertFalse(precisPublishLimiter.tryAcquire(15, 150)); + assertFalse(precisePublishLimiter.tryAcquire(15, 150)); //update - precisPublishLimiter.update(newPublishRate); - assertTrue(precisPublishLimiter.tryAcquire(15, 150)); + precisePublishLimiter.update(newPublishRate); + assertTrue(precisePublishLimiter.tryAcquire(15, 150)); } @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { - Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); - Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); - Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + Class precisePublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisePublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisePublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisePublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); topicPublishRateLimiterOnMessageField.setAccessible(true); topicPublishRateLimiterOnByteField.setAccessible(true); - RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); - RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get( + precisePublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get( + precisePublishLimiter); Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); @@ -112,7 +114,7 @@ public class PublishRateLimiterTest { renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); // running tryAcquire in order to lazyInit the renewTask - precisPublishLimiter.tryAcquire(1, 10); + precisePublishLimiter.tryAcquire(1, 10); Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); @@ -129,30 +131,30 @@ public class PublishRateLimiterTest { renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(1, 10)); + assertTrue(precisePublishLimiter.tryAcquire(1, 10)); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded - assertFalse(precisPublishLimiter.tryAcquire(11, 100)); + assertFalse(precisePublishLimiter.tryAcquire(11, 100)); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded - assertFalse(precisPublishLimiter.tryAcquire(10, 101)); + assertFalse(precisePublishLimiter.tryAcquire(10, 101)); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire exceeded exactly - assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + assertFalse(precisePublishLimiter.tryAcquire(10, 100)); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(9, 99)); + assertTrue(precisePublishLimiter.tryAcquire(9, 99)); } }
