This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eebefbd3dd8bedf60627442df6c5aae1e3736018
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 10 10:40:17 2020 +0900

    Restore clusterDispatchRate policy for compatibility (#6176)
    
    Co-authored-by: Sijie Guo <[email protected]>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  6 ++
 .../service/persistent/DispatchRateLimiter.java    |  8 +++
 .../client/api/MessageDispatchThrottlingTest.java  | 83 ++++++++++++++++++++--
 .../pulsar/common/policies/data/Policies.java      |  6 +-
 4 files changed, 98 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index adc7bd4..1f32077 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -854,6 +854,7 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    @SuppressWarnings("deprecation")
     protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
         log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), 
namespaceName, dispatchRate);
         validateSuperUserAccess();
@@ -866,6 +867,7 @@ public abstract class NamespacesBase extends AdminResource {
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
                     () -> new RestException(Status.NOT_FOUND, "Namespace " + 
namespaceName + " does not exist"));
             
policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(),
 dispatchRate);
+            
policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(),
 dispatchRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, 
jsonMapper().writeValueAsBytes(policiesNode.getKey()),
@@ -891,11 +893,15 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    @SuppressWarnings("deprecation")
     protected DispatchRate internalGetTopicDispatchRate() {
         validateAdminAccessForTenant(namespaceName.getTenant());
 
         Policies policies = getNamespacePolicies(namespaceName);
         DispatchRate dispatchRate = 
policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
+        if (dispatchRate == null) {
+            dispatchRate = 
policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName());
+        }
         if (dispatchRate != null) {
             return dispatchRate;
         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 3f437dd..29eed73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -186,6 +186,7 @@ public class DispatchRateLimiter {
         return true;
     }
 
+    @SuppressWarnings("deprecation")
     public void onPoliciesUpdate(Policies data) {
         String cluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
@@ -194,6 +195,9 @@ public class DispatchRateLimiter {
         switch (type) {
             case TOPIC:
                 dispatchRate = data.topicDispatchRate.get(cluster);
+                if (dispatchRate == null) {
+                    dispatchRate = data.clusterDispatchRate.get(cluster);
+                }
                 break;
             case SUBSCRIPTION:
                 dispatchRate = data.subscriptionDispatchRate.get(cluster);
@@ -219,6 +223,7 @@ public class DispatchRateLimiter {
         }
     }
 
+    @SuppressWarnings("deprecation")
     public static DispatchRate getPoliciesDispatchRate(final String cluster, 
Optional<Policies> policies, Type type) {
         // return policy-dispatch rate only if it's enabled in policies
         return policies.map(p -> {
@@ -226,6 +231,9 @@ public class DispatchRateLimiter {
             switch (type) {
                 case TOPIC:
                     dispatchRate = p.topicDispatchRate.get(cluster);
+                    if (dispatchRate == null) {
+                        dispatchRate = p.clusterDispatchRate.get(cluster);
+                    }
                     break;
                 case SUBSCRIPTION:
                     dispatchRate = p.subscriptionDispatchRate.get(cluster);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index b85e088..e6f1b19 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -18,14 +18,17 @@
  */
 package org.apache.pulsar.client.api;
 
-import com.google.common.collect.Sets;
-
 import static org.testng.Assert.assertNotNull;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +40,7 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -101,6 +105,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
+    @SuppressWarnings("deprecation")
     @Test
     public void testMessageRateDynamicallyChange() throws Exception {
 
@@ -116,7 +121,7 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         // (1) verify message-rate is -1 initially
         Assert.assertFalse(topic.getDispatchRateLimiter().isPresent());
 
-        // (1) change to 100
+        // (2) change to 100
         int messageRate = 100;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
@@ -134,8 +139,13 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         }
         Assert.assertTrue(isDispatchRateUpdate);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), 
dispatchRate);
+        Policies policies = admin.namespaces().getPolicies(namespace);
+        Map<String, DispatchRate> dispatchRateMap = Maps.newHashMap();
+        dispatchRateMap.put("test", dispatchRate);
+        Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
+        Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);
 
-        // (1) change to 500
+        // (3) change to 500
         messageRate = 500;
         dispatchRate = new DispatchRate(-1, messageRate, 360);
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
@@ -152,6 +162,10 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         }
         Assert.assertTrue(isDispatchRateUpdate);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), 
dispatchRate);
+        policies = admin.namespaces().getPolicies(namespace);
+        dispatchRateMap.put("test", dispatchRate);
+        Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
+        Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);
 
         producer.close();
     }
@@ -896,6 +910,67 @@ public class MessageDispatchThrottlingTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDispatchRateCompatibility1() throws Exception {
+        final String cluster = "test";
+
+        Optional<Policies> policies = Optional.of(new Policies());
+        DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1);
+        DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1);
+
+        // (1) If both clusterDispatchRate and topicDispatchRate are empty, 
dispatch throttling is disabled
+        DispatchRate dispatchRate = 
DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies,
+                DispatchRateLimiter.Type.TOPIC);
+        Assert.assertNull(dispatchRate);
+
+        // (2) If topicDispatchRate is empty, clusterDispatchRate is effective
+        policies.get().clusterDispatchRate.put(cluster, clusterDispatchRate);
+        dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, 
policies, DispatchRateLimiter.Type.TOPIC);
+        Assert.assertEquals(dispatchRate, clusterDispatchRate);
+
+        // (3) If topicDispatchRate is not empty, topicDispatchRate is 
effective
+        policies.get().topicDispatchRate.put(cluster, topicDispatchRate);
+        dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, 
policies, DispatchRateLimiter.Type.TOPIC);
+        Assert.assertEquals(dispatchRate, topicDispatchRate);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDispatchRateCompatibility2() throws Exception {
+        final String namespace = "my-property/dispatch-rate-compatibility";
+        final String topicName = "persistent://" + namespace + "/t1";
+        final String cluster = "test";
+        admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster));
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        DispatchRateLimiter dispatchRateLimiter = new 
DispatchRateLimiter(topic, DispatchRateLimiter.Type.TOPIC);
+
+        Policies policies = new Policies();
+        DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1);
+        DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1);
+
+        // (1) If both clusterDispatchRate and topicDispatchRate are empty, 
dispatch throttling is disabled
+        dispatchRateLimiter.onPoliciesUpdate(policies);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+
+        // (2) If topicDispatchRate is empty, clusterDispatchRate is effective
+        policies.clusterDispatchRate.put(cluster, clusterDispatchRate);
+        dispatchRateLimiter.onPoliciesUpdate(policies);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512);
+
+        // (3) If topicDispatchRate is not empty, topicDispatchRate is 
effective
+        policies.topicDispatchRate.put(cluster, topicDispatchRate);
+        dispatchRateLimiter.onPoliciesUpdate(policies);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024);
+
+        producer.close();
+        topic.close().get();
+    }
+
     protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
         Field statsUpdaterField = 
BrokerService.class.getDeclaredField("statsUpdater");
         statsUpdaterField.setAccessible(true);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 8e55280..12e5594 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -39,6 +39,8 @@ public class Policies {
     public BundlesData bundles;
     @SuppressWarnings("checkstyle:MemberName")
     public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map 
= Maps.newHashMap();
+    @Deprecated
+    public Map<String, DispatchRate> clusterDispatchRate = Maps.newHashMap();
     public Map<String, DispatchRate> topicDispatchRate = Maps.newHashMap();
     public Map<String, DispatchRate> subscriptionDispatchRate = 
Maps.newHashMap();
     public Map<String, DispatchRate> replicatorDispatchRate = 
Maps.newHashMap();
@@ -97,7 +99,7 @@ public class Policies {
     @Override
     public int hashCode() {
         return Objects.hash(auth_policies, replication_clusters,
-                backlog_quota_map, publishMaxMessageRate,
+                backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
                 topicDispatchRate, subscriptionDispatchRate, 
replicatorDispatchRate,
                 clusterSubscribeRate, deduplicationEnabled, persistence,
                 bundles, latency_stats_sample_rate,
@@ -120,6 +122,7 @@ public class Policies {
             return Objects.equals(auth_policies, other.auth_policies)
                     && Objects.equals(replication_clusters, 
other.replication_clusters)
                     && Objects.equals(backlog_quota_map, 
other.backlog_quota_map)
+                    && Objects.equals(clusterDispatchRate, 
other.clusterDispatchRate)
                     && Objects.equals(topicDispatchRate, 
other.topicDispatchRate)
                     && Objects.equals(subscriptionDispatchRate, 
other.subscriptionDispatchRate)
                     && Objects.equals(replicatorDispatchRate, 
other.replicatorDispatchRate)
@@ -171,6 +174,7 @@ public class Policies {
                 .add("replication_clusters", 
replication_clusters).add("bundles", bundles)
                 .add("backlog_quota_map", 
backlog_quota_map).add("persistence", persistence)
                 .add("deduplicationEnabled", deduplicationEnabled)
+                .add("clusterDispatchRate", clusterDispatchRate)
                 .add("topicDispatchRate", topicDispatchRate)
                 .add("subscriptionDispatchRate", subscriptionDispatchRate)
                 .add("replicatorDispatchRate", replicatorDispatchRate)

Reply via email to