This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a29feca574d [fix][broker]Fix topic-level replicator rate limiter not
init (#15825)
a29feca574d is described below
commit a29feca574d581e413ca53c703a86b4e4b60e7fb
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Mon Jun 13 10:06:46 2022 +0800
[fix][broker]Fix topic-level replicator rate limiter not init (#15825)
* Fix bug: The replicator rate limiter will not be initialized and updated
if only topic-level policy is enabled, because `replicator.getRateLimiter()`
is empty for L3067:
https://github.com/apache/pulsar/blob/a43981109a9322d94082ae0d87d0de53b8f237e8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3063-L3068
* Add the method
`org.apache.pulsar.broker.service.Replicator#updateRateLimiter` to initialize
or update the replicator rate limiter.
* Use this method to initialize or update all level replicator rate limiter
(cherry picked from commit 9f40cc1d1104900c450a599676ca446b1f096a00)
---
.../pulsar/broker/service/BrokerService.java | 2 +-
.../apache/pulsar/broker/service/Replicator.java | 3 +
.../service/persistent/PersistentReplicator.java | 15 ++-
.../broker/service/persistent/PersistentTopic.java | 11 +-
.../broker/service/ReplicatorRateLimiterTest.java | 128 +++++++++++++++++++++
5 files changed, 146 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cda3d90fd98..3aa0c73ae8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2318,7 +2318,7 @@ public class BrokerService implements Closeable {
((AbstractTopic)
topic).updateBrokerReplicatorDispatchRate();
}
topic.getReplicators().forEach((name,
persistentReplicator) ->
-
persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+ persistentReplicator.updateRateLimiter());
}
);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 2cd6ec62327..eea90efb883 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -41,6 +41,9 @@ public interface Replicator {
//No-op
}
+ default void updateRateLimiter() {
+ }
+
default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc5410dbbeb..953300e823f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,6 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
protected final ManagedCursor cursor;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
+ private final Object dispatchRateLimiterLock = new Object();
private int readBatchSize;
private final int readMaxSizeBytes;
@@ -705,12 +706,20 @@ public class PersistentReplicator extends
AbstractReplicator
@Override
public void initializeDispatchRateLimiterIfNeeded() {
- if (!dispatchRateLimiter.isPresent()
- &&
DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
- this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, Type.REPLICATOR));
+ synchronized (dispatchRateLimiterLock) {
+ if (!dispatchRateLimiter.isPresent()
+ &&
DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
+ this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, Type.REPLICATOR));
+ }
}
}
+ @Override
+ public void updateRateLimiter() {
+ initializeDispatchRateLimiterIfNeeded();
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+ }
+
private void checkReplicatedSubscriptionMarker(Position position,
MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 27bb77fbc5d..a82bdb8d286 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -392,10 +392,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
dispatcher.initializeDispatchRateLimiterIfNeeded();
}
});
-
- // dispatch rate limiter for each replicator
- replicators.forEach((name, replicator) ->
- replicator.initializeDispatchRateLimiterIfNeeded());
}
}
@@ -2416,9 +2412,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
});
return
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
- replicators.forEach((name, replicator) ->
-
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
- );
+ replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture =
checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture =
checkDeduplicationStatus();
@@ -3075,8 +3069,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
- replicators.forEach((name, replicator) ->
replicator.getRateLimiter()
- .ifPresent(DispatchRateLimiter::updateDispatchRate));
+ replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 65e4bb0a785..fdf27adc718 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@@ -76,6 +77,133 @@ public class ReplicatorRateLimiterTest extends
ReplicatorTestBase {
return new Object[][] { { DispatchRateType.messageRate }, {
DispatchRateType.byteRate } };
}
+ @Test
+ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception
{
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable
broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" +
System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace +
"/testReplicatorRateLimiterWithOnlyTopicLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set topic-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin1.topics().getReplicatorDispatchRate(topicName),
topicRate));
+
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
10);
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
20L);
+
+ //remove topic-level policy
+ admin1.topics().removeReplicatorDispatchRate(topicName);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
-1);
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws
Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable
broker level
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" +
System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace +
"/testReplicatorRateLimiterWithOnlyNamespaceLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set namespace-level policy, which should take effect
+ DispatchRate topicRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(10)
+ .dispatchThrottlingRateInByte(20)
+ .ratePeriodInSecond(30)
+ .build();
+ admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate);
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace),
topicRate));
+
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
10);
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
20L);
+
+ //remove topic-level policy
+ admin1.namespaces().removeReplicatorDispatchRate(namespace);
+ Awaitility.await().untilAsserted(() ->
+
assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace)));
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
-1);
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ -1L);
+ }
+
+ @Test
+ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws
Exception {
+ cleanup();
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable
broker level when init
+ config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" +
System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace +
"/testReplicatorRateLimiterWithOnlyBrokerLevel";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ // rate limiter disable by default
+
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+ //set broker-level policy, which should take effect
+
admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg",
"10");
+
admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte",
"20");
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(admin1.brokers()
+
.getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
+ assertEquals(admin1.brokers()
+
.getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"),
"10");
+ assertEquals(admin1.brokers()
+
.getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"),
"20");
+ });
+
+
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
10);
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
20L);
+ }
+
@Test
public void testReplicatorRatePriority() throws Exception {
cleanup();