This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a29feca574d [fix][broker]Fix topic-level replicator rate limiter not 
init (#15825)
a29feca574d is described below

commit a29feca574d581e413ca53c703a86b4e4b60e7fb
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Mon Jun 13 10:06:46 2022 +0800

    [fix][broker]Fix topic-level replicator rate limiter not init (#15825)
    
    * Fix bug: The replicator rate limiter will not be initialized and updated 
if only topic-level policy is enabled, because  `replicator.getRateLimiter()` 
is empty for L3067:
    
https://github.com/apache/pulsar/blob/a43981109a9322d94082ae0d87d0de53b8f237e8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3063-L3068
    
    *  Add the method 
`org.apache.pulsar.broker.service.Replicator#updateRateLimiter` to initialize 
or update  the replicator rate limiter.
    *  Use this method to initialize or update all level replicator rate limiter
    
    (cherry picked from commit 9f40cc1d1104900c450a599676ca446b1f096a00)
---
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../apache/pulsar/broker/service/Replicator.java   |   3 +
 .../service/persistent/PersistentReplicator.java   |  15 ++-
 .../broker/service/persistent/PersistentTopic.java |  11 +-
 .../broker/service/ReplicatorRateLimiterTest.java  | 128 +++++++++++++++++++++
 5 files changed, 146 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cda3d90fd98..3aa0c73ae8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2318,7 +2318,7 @@ public class BrokerService implements Closeable {
                         ((AbstractTopic) 
topic).updateBrokerReplicatorDispatchRate();
                     }
                     topic.getReplicators().forEach((name, 
persistentReplicator) ->
-                        
persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+                        persistentReplicator.updateRateLimiter());
                 }
             );
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 2cd6ec62327..eea90efb883 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -41,6 +41,9 @@ public interface Replicator {
         //No-op
     }
 
+    default void updateRateLimiter() {
+    }
+
     default Optional<DispatchRateLimiter> getRateLimiter() {
         return Optional.empty();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc5410dbbeb..953300e823f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,6 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
     protected final ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
+    private final Object dispatchRateLimiterLock = new Object();
 
     private int readBatchSize;
     private final int readMaxSizeBytes;
@@ -705,12 +706,20 @@ public class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     public void initializeDispatchRateLimiterIfNeeded() {
-        if (!dispatchRateLimiter.isPresent()
-            && 
DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
-            this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, Type.REPLICATOR));
+        synchronized (dispatchRateLimiterLock) {
+            if (!dispatchRateLimiter.isPresent()
+                && 
DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
+                this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, Type.REPLICATOR));
+            }
         }
     }
 
+    @Override
+    public void updateRateLimiter() {
+        initializeDispatchRateLimiterIfNeeded();
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+    }
+
     private void checkReplicatedSubscriptionMarker(Position position, 
MessageImpl<?> msg, ByteBuf payload) {
         if (!msg.getMessageBuilder().hasMarkerType()) {
             // No marker is defined
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 27bb77fbc5d..a82bdb8d286 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -392,10 +392,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     dispatcher.initializeDispatchRateLimiterIfNeeded();
                 }
             });
-
-            // dispatch rate limiter for each replicator
-            replicators.forEach((name, replicator) ->
-                replicator.initializeDispatchRateLimiterIfNeeded());
         }
     }
 
@@ -2416,9 +2412,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             });
 
             return 
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
-                replicators.forEach((name, replicator) ->
-                        
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-                );
+                replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
                 checkMessageExpiry();
                 CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
                 CompletableFuture<Void> dedupFuture = 
checkDeduplicationStatus();
@@ -3075,8 +3069,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
                         
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
             }
-            replicators.forEach((name, replicator) -> 
replicator.getRateLimiter()
-                    .ifPresent(DispatchRateLimiter::updateDispatchRate));
+            replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
 
             if (policies.getReplicationClusters() != null) {
                 checkReplicationAndRetryOnFailure();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 65e4bb0a785..fdf27adc718 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +77,133 @@ public class ReplicatorRateLimiterTest extends 
ReplicatorTestBase {
         return new Object[][] { { DispatchRateType.messageRate }, { 
DispatchRateType.byteRate } };
     }
 
+    @Test
+    public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception 
{
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable 
broker level
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + 
System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + 
"/testReplicatorRateLimiterWithOnlyTopicLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set topic-level policy, which should take effect
+        DispatchRate topicRate = DispatchRate.builder()
+            .dispatchThrottlingRateInMsg(10)
+            .dispatchThrottlingRateInByte(20)
+            .ratePeriodInSecond(30)
+            .build();
+        admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), 
topicRate));
+        
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
 10);
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
 20L);
+
+        //remove topic-level policy
+        admin1.topics().removeReplicatorDispatchRate(topicName);
+        Awaitility.await().untilAsserted(() ->
+            assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
 -1);
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+            -1L);
+    }
+
+    @Test
+    public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws 
Exception {
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable 
broker level
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + 
System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + 
"/testReplicatorRateLimiterWithOnlyNamespaceLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set namespace-level policy, which should take effect
+        DispatchRate topicRate = DispatchRate.builder()
+            .dispatchThrottlingRateInMsg(10)
+            .dispatchThrottlingRateInByte(20)
+            .ratePeriodInSecond(30)
+            .build();
+        admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate);
+        Awaitility.await().untilAsserted(() ->
+            
assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), 
topicRate));
+        
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
 10);
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
 20L);
+
+        //remove topic-level policy
+        admin1.namespaces().removeReplicatorDispatchRate(namespace);
+        Awaitility.await().untilAsserted(() ->
+            
assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace)));
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
 -1);
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+            -1L);
+    }
+
+    @Test
+    public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws 
Exception {
+        cleanup();
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(0); // disable 
broker level when init
+        config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + 
System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + 
"/testReplicatorRateLimiterWithOnlyBrokerLevel";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // rate limiter disable by default
+        
assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        //set broker-level policy, which should take effect
+        
admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg",
 "10");
+        
admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte",
 "20");
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(admin1.brokers()
+                
.getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
+            assertEquals(admin1.brokers()
+                
.getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"), 
"10");
+            assertEquals(admin1.brokers()
+                
.getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"),
 "20");
+        });
+
+        
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(),
 10);
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
 20L);
+    }
+
     @Test
     public void testReplicatorRatePriority() throws Exception {
         cleanup();

Reply via email to