This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e54e9b0463 [fix][broker] Fix class name typo `PrecisPublishLimiter`
to "Precise" (#20310)
7e54e9b0463 is described below
commit 7e54e9b0463f5c69846dc44892d88281d5508466
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Mon May 15 16:29:50 2023 +0900
[fix][broker] Fix class name typo `PrecisPublishLimiter` to "Precise"
(#20310)
---
.../pulsar/broker/service/AbstractTopic.java | 2 +-
...lishLimiter.java => PrecisePublishLimiter.java} | 10 +++----
.../PrecisTopicPublishRateThrottleTest.java | 2 +-
...terTest.java => PrecisePublishLimiterTest.java} | 30 +++++++++----------
.../broker/service/PublishRateLimiterTest.java | 34 ++++++++++++----------
5 files changed, 40 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 8269dc0c3d1..4614b846c8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1274,7 +1274,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
|| this.topicPublishRateLimiter ==
PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
- this.topicPublishRateLimiter = new
PrecisPublishLimiter(publishRate,
+ this.topicPublishRateLimiter = new
PrecisePublishLimiter(publishRate,
() -> this.enableCnxAutoRead(),
brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new
PublishRateLimiterImpl(publishRate);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java
similarity index 93%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java
index 6e215b3b495..ce14f6d7dd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisePublishLimiter.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
-public class PrecisPublishLimiter implements PublishRateLimiter {
+public class PrecisePublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
// precise mode for publish rate limiter
@@ -33,18 +33,18 @@ public class PrecisPublishLimiter implements
PublishRateLimiter {
private final RateLimitFunction rateLimitFunction;
private final ScheduledExecutorService scheduledExecutorService;
- public PrecisPublishLimiter(Policies policies, String clusterName,
RateLimitFunction rateLimitFunction) {
+ public PrecisePublishLimiter(Policies policies, String clusterName,
RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
this.scheduledExecutorService = null;
}
- public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction
rateLimitFunction) {
+ public PrecisePublishLimiter(PublishRate publishRate, RateLimitFunction
rateLimitFunction) {
this(publishRate, rateLimitFunction, null);
}
- public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction
rateLimitFunction,
- ScheduledExecutorService
scheduledExecutorService) {
+ public PrecisePublishLimiter(PublishRate publishRate, RateLimitFunction
rateLimitFunction,
+ ScheduledExecutorService
scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
this.scheduledExecutorService = scheduledExecutorService;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
index 07632814378..c22ed41fc15 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
@@ -164,7 +164,7 @@ public class PrecisTopicPublishRateThrottleTest extends
BrokerTestBase{
"" + rateInMsg));
Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- PrecisPublishLimiter limiter = ((PrecisPublishLimiter)
((AbstractTopic) topicRef).topicPublishRateLimiter);
+ PrecisePublishLimiter limiter = ((PrecisePublishLimiter)
((AbstractTopic) topicRef).topicPublishRateLimiter);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg));
Assert.assertEquals(limiter.publishMaxByteRate, 0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
similarity index 57%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
index a0d0df52d24..73cb43d52b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
@@ -23,35 +23,35 @@ import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;
-public class PrecisPublishLimiterTest {
+public class PrecisePublishLimiterTest {
@Test
void shouldResetMsgLimitAfterUpdate() {
- PrecisPublishLimiter precisPublishLimiter = new
PrecisPublishLimiter(new PublishRate(), () -> {
+ PrecisePublishLimiter precisePublishLimiter = new
PrecisePublishLimiter(new PublishRate(), () -> {
});
- precisPublishLimiter.update(new PublishRate(1, 1));
- assertFalse(precisPublishLimiter.tryAcquire(99, 99));
- precisPublishLimiter.update(new PublishRate(-1, 100));
- assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+ precisePublishLimiter.update(new PublishRate(1, 1));
+ assertFalse(precisePublishLimiter.tryAcquire(99, 99));
+ precisePublishLimiter.update(new PublishRate(-1, 100));
+ assertTrue(precisePublishLimiter.tryAcquire(99, 99));
}
@Test
void shouldResetBytesLimitAfterUpdate() {
- PrecisPublishLimiter precisPublishLimiter = new
PrecisPublishLimiter(new PublishRate(), () -> {
+ PrecisePublishLimiter precisePublishLimiter = new
PrecisePublishLimiter(new PublishRate(), () -> {
});
- precisPublishLimiter.update(new PublishRate(1, 1));
- assertFalse(precisPublishLimiter.tryAcquire(99, 99));
- precisPublishLimiter.update(new PublishRate(100, -1));
- assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+ precisePublishLimiter.update(new PublishRate(1, 1));
+ assertFalse(precisePublishLimiter.tryAcquire(99, 99));
+ precisePublishLimiter.update(new PublishRate(100, -1));
+ assertTrue(precisePublishLimiter.tryAcquire(99, 99));
}
@Test
void shouldCloseResources() throws Exception {
for (int i = 0; i < 20000; i++) {
- PrecisPublishLimiter precisPublishLimiter = new
PrecisPublishLimiter(new PublishRate(100, 100), () -> {
+ PrecisePublishLimiter precisePublishLimiter = new
PrecisePublishLimiter(new PublishRate(100, 100), () -> {
});
- precisPublishLimiter.tryAcquire(99, 99);
- precisPublishLimiter.close();
+ precisePublishLimiter.tryAcquire(99, 99);
+ precisePublishLimiter.close();
}
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 9d2bd49ba04..b934ced08c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -39,7 +39,7 @@ public class PublishRateLimiterTest {
private final PublishRate publishRate = new PublishRate(10, 100);
private final PublishRate newPublishRate = new PublishRate(20, 200);
- private PrecisPublishLimiter precisPublishLimiter;
+ private PrecisePublishLimiter precisePublishLimiter;
private PublishRateLimiterImpl publishRateLimiter;
@BeforeMethod
@@ -47,7 +47,7 @@ public class PublishRateLimiterTest {
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
- precisPublishLimiter = new PrecisPublishLimiter(policies,
CLUSTER_NAME, () -> System.out.print("Refresh permit"));
+ precisePublishLimiter = new PrecisePublishLimiter(policies,
CLUSTER_NAME, () -> System.out.print("Refresh permit"));
publishRateLimiter = new PublishRateLimiterImpl(policies,
CLUSTER_NAME);
}
@@ -88,23 +88,25 @@ public class PublishRateLimiterTest {
@Test
public void testPrecisePublishRateLimiterUpdate() {
- assertFalse(precisPublishLimiter.tryAcquire(15, 150));
+ assertFalse(precisePublishLimiter.tryAcquire(15, 150));
//update
- precisPublishLimiter.update(newPublishRate);
- assertTrue(precisPublishLimiter.tryAcquire(15, 150));
+ precisePublishLimiter.update(newPublishRate);
+ assertTrue(precisePublishLimiter.tryAcquire(15, 150));
}
@Test
public void testPrecisePublishRateLimiterAcquire() throws Exception {
- Class precisPublishLimiterClass =
Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter");
- Field topicPublishRateLimiterOnMessageField =
precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage");
- Field topicPublishRateLimiterOnByteField =
precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte");
+ Class precisePublishLimiterClass =
Class.forName("org.apache.pulsar.broker.service.PrecisePublishLimiter");
+ Field topicPublishRateLimiterOnMessageField =
precisePublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage");
+ Field topicPublishRateLimiterOnByteField =
precisePublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte");
topicPublishRateLimiterOnMessageField.setAccessible(true);
topicPublishRateLimiterOnByteField.setAccessible(true);
- RateLimiter topicPublishRateLimiterOnMessage =
(RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter);
- RateLimiter topicPublishRateLimiterOnByte =
(RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter);
+ RateLimiter topicPublishRateLimiterOnMessage =
(RateLimiter)topicPublishRateLimiterOnMessageField.get(
+ precisePublishLimiter);
+ RateLimiter topicPublishRateLimiterOnByte =
(RateLimiter)topicPublishRateLimiterOnByteField.get(
+ precisePublishLimiter);
Method renewTopicPublishRateLimiterOnMessageMethod =
topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null);
Method renewTopicPublishRateLimiterOnByteMethod =
topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null);
@@ -112,7 +114,7 @@ public class PublishRateLimiterTest {
renewTopicPublishRateLimiterOnByteMethod.setAccessible(true);
// running tryAcquire in order to lazyInit the renewTask
- precisPublishLimiter.tryAcquire(1, 10);
+ precisePublishLimiter.tryAcquire(1, 10);
Field onMessageRenewTaskField =
topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask");
Field onByteRenewTaskField =
topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask");
@@ -129,30 +131,30 @@ public class PublishRateLimiterTest {
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
// tryAcquire not exceeded
- assertTrue(precisPublishLimiter.tryAcquire(1, 10));
+ assertTrue(precisePublishLimiter.tryAcquire(1, 10));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
// tryAcquire numOfMessages exceeded
- assertFalse(precisPublishLimiter.tryAcquire(11, 100));
+ assertFalse(precisePublishLimiter.tryAcquire(11, 100));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
// tryAcquire msgSizeInBytes exceeded
- assertFalse(precisPublishLimiter.tryAcquire(10, 101));
+ assertFalse(precisePublishLimiter.tryAcquire(10, 101));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
// tryAcquire exceeded exactly
- assertFalse(precisPublishLimiter.tryAcquire(10, 100));
+ assertFalse(precisePublishLimiter.tryAcquire(10, 100));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
// tryAcquire not exceeded
- assertTrue(precisPublishLimiter.tryAcquire(9, 99));
+ assertTrue(precisePublishLimiter.tryAcquire(9, 99));
}
}