This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eebefbd3dd8bedf60627442df6c5aae1e3736018 Author: Masahiro Sakamoto <[email protected]> AuthorDate: Mon Feb 10 10:40:17 2020 +0900 Restore clusterDispatchRate policy for compatibility (#6176) Co-authored-by: Sijie Guo <[email protected]> --- .../pulsar/broker/admin/impl/NamespacesBase.java | 6 ++ .../service/persistent/DispatchRateLimiter.java | 8 +++ .../client/api/MessageDispatchThrottlingTest.java | 83 ++++++++++++++++++++-- .../pulsar/common/policies/data/Policies.java | 6 +- 4 files changed, 98 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index adc7bd4..1f32077 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -854,6 +854,7 @@ public abstract class NamespacesBase extends AdminResource { } } + @SuppressWarnings("deprecation") protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) { log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); validateSuperUserAccess(); @@ -866,6 +867,7 @@ public abstract class NamespacesBase extends AdminResource { policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); + policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); // Write back the new policies into zookeeper globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()), @@ -891,11 +893,15 @@ public abstract class NamespacesBase extends AdminResource { } } + @SuppressWarnings("deprecation") protected DispatchRate internalGetTopicDispatchRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()); + if (dispatchRate == null) { + dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName()); + } if (dispatchRate != null) { return dispatchRate; } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 3f437dd..29eed73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -186,6 +186,7 @@ public class DispatchRateLimiter { return true; } + @SuppressWarnings("deprecation") public void onPoliciesUpdate(Policies data) { String cluster = brokerService.pulsar().getConfiguration().getClusterName(); @@ -194,6 +195,9 @@ public class DispatchRateLimiter { switch (type) { case TOPIC: dispatchRate = data.topicDispatchRate.get(cluster); + if (dispatchRate == null) { + dispatchRate = data.clusterDispatchRate.get(cluster); + } break; case SUBSCRIPTION: dispatchRate = data.subscriptionDispatchRate.get(cluster); @@ -219,6 +223,7 @@ public class DispatchRateLimiter { } } + @SuppressWarnings("deprecation") public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional<Policies> policies, Type type) { // return policy-dispatch rate only if it's enabled in policies return policies.map(p -> { @@ -226,6 +231,9 @@ public class DispatchRateLimiter { switch (type) { case TOPIC: dispatchRate = p.topicDispatchRate.get(cluster); + if (dispatchRate == null) { + dispatchRate = p.clusterDispatchRate.get(cluster); + } break; case SUBSCRIPTION: dispatchRate = p.subscriptionDispatchRate.get(cluster); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index b85e088..e6f1b19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -18,14 +18,17 @@ */ package org.apache.pulsar.client.api; -import com.google.common.collect.Sets; - import static org.testng.Assert.assertNotNull; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import java.lang.reflect.Field; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -37,6 +40,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.Policies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -101,6 +105,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { * * @throws Exception */ + @SuppressWarnings("deprecation") @Test public void testMessageRateDynamicallyChange() throws Exception { @@ -116,7 +121,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { // (1) verify message-rate is -1 initially Assert.assertFalse(topic.getDispatchRateLimiter().isPresent()); - // (1) change to 100 + // (2) change to 100 int messageRate = 100; DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360); admin.namespaces().setDispatchRate(namespace, dispatchRate); @@ -134,8 +139,13 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { } Assert.assertTrue(isDispatchRateUpdate); Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate); + Policies policies = admin.namespaces().getPolicies(namespace); + Map<String, DispatchRate> dispatchRateMap = Maps.newHashMap(); + dispatchRateMap.put("test", dispatchRate); + Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap); + Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap); - // (1) change to 500 + // (3) change to 500 messageRate = 500; dispatchRate = new DispatchRate(-1, messageRate, 360); admin.namespaces().setDispatchRate(namespace, dispatchRate); @@ -152,6 +162,10 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { } Assert.assertTrue(isDispatchRateUpdate); Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate); + policies = admin.namespaces().getPolicies(namespace); + dispatchRateMap.put("test", dispatchRate); + Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap); + Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap); producer.close(); } @@ -896,6 +910,67 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @SuppressWarnings("deprecation") + @Test + public void testDispatchRateCompatibility1() throws Exception { + final String cluster = "test"; + + Optional<Policies> policies = Optional.of(new Policies()); + DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1); + DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1); + + // (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled + DispatchRate dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, + DispatchRateLimiter.Type.TOPIC); + Assert.assertNull(dispatchRate); + + // (2) If topicDispatchRate is empty, clusterDispatchRate is effective + policies.get().clusterDispatchRate.put(cluster, clusterDispatchRate); + dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC); + Assert.assertEquals(dispatchRate, clusterDispatchRate); + + // (3) If topicDispatchRate is not empty, topicDispatchRate is effective + policies.get().topicDispatchRate.put(cluster, topicDispatchRate); + dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC); + Assert.assertEquals(dispatchRate, topicDispatchRate); + } + + @SuppressWarnings("deprecation") + @Test + public void testDispatchRateCompatibility2() throws Exception { + final String namespace = "my-property/dispatch-rate-compatibility"; + final String topicName = "persistent://" + namespace + "/t1"; + final String cluster = "test"; + admin.namespaces().createNamespace(namespace, Sets.newHashSet(cluster)); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + DispatchRateLimiter dispatchRateLimiter = new DispatchRateLimiter(topic, DispatchRateLimiter.Type.TOPIC); + + Policies policies = new Policies(); + DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1); + DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1); + + // (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled + dispatchRateLimiter.onPoliciesUpdate(policies); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1); + + // (2) If topicDispatchRate is empty, clusterDispatchRate is effective + policies.clusterDispatchRate.put(cluster, clusterDispatchRate); + dispatchRateLimiter.onPoliciesUpdate(policies); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512); + + // (3) If topicDispatchRate is not empty, topicDispatchRate is effective + policies.topicDispatchRate.put(cluster, topicDispatchRate); + dispatchRateLimiter.onPoliciesUpdate(policies); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024); + + producer.close(); + topic.close().get(); + } + protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater"); statsUpdaterField.setAccessible(true); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 8e55280..12e5594 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -39,6 +39,8 @@ public class Policies { public BundlesData bundles; @SuppressWarnings("checkstyle:MemberName") public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = Maps.newHashMap(); + @Deprecated + public Map<String, DispatchRate> clusterDispatchRate = Maps.newHashMap(); public Map<String, DispatchRate> topicDispatchRate = Maps.newHashMap(); public Map<String, DispatchRate> subscriptionDispatchRate = Maps.newHashMap(); public Map<String, DispatchRate> replicatorDispatchRate = Maps.newHashMap(); @@ -97,7 +99,7 @@ public class Policies { @Override public int hashCode() { return Objects.hash(auth_policies, replication_clusters, - backlog_quota_map, publishMaxMessageRate, + backlog_quota_map, publishMaxMessageRate, clusterDispatchRate, topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate, clusterSubscribeRate, deduplicationEnabled, persistence, bundles, latency_stats_sample_rate, @@ -120,6 +122,7 @@ public class Policies { return Objects.equals(auth_policies, other.auth_policies) && Objects.equals(replication_clusters, other.replication_clusters) && Objects.equals(backlog_quota_map, other.backlog_quota_map) + && Objects.equals(clusterDispatchRate, other.clusterDispatchRate) && Objects.equals(topicDispatchRate, other.topicDispatchRate) && Objects.equals(subscriptionDispatchRate, other.subscriptionDispatchRate) && Objects.equals(replicatorDispatchRate, other.replicatorDispatchRate) @@ -171,6 +174,7 @@ public class Policies { .add("replication_clusters", replication_clusters).add("bundles", bundles) .add("backlog_quota_map", backlog_quota_map).add("persistence", persistence) .add("deduplicationEnabled", deduplicationEnabled) + .add("clusterDispatchRate", clusterDispatchRate) .add("topicDispatchRate", topicDispatchRate) .add("subscriptionDispatchRate", subscriptionDispatchRate) .add("replicatorDispatchRate", replicatorDispatchRate)
