This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dbeb2d08d Cleanup DispatchRateLimiter method isDispatchRateNeeded 
(#15953)
39dbeb2d08d is described below

commit 39dbeb2d08d0f8e6495ef4f97e1c28fc72b6de87
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Jun 14 10:29:31 2022 +0800

    Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
---
 .../service/persistent/DispatchRateLimiter.java    | 75 ----------------------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 19 +-----
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 73 ++++++++-------------
 3 files changed, 29 insertions(+), 138 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 481a97ee15b..5f788c9ac14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -24,12 +24,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.slf4j.Logger;
@@ -203,79 +201,6 @@ public class DispatchRateLimiter {
         updateDispatchRate(dispatchRate);
     }
 
-    public static boolean isDispatchRateNeeded(BrokerService brokerService, 
Optional<Policies> policies,
-            String topicName, Type type) {
-        final ServiceConfiguration serviceConfig = 
brokerService.pulsar().getConfiguration();
-        if (type == Type.BROKER) {
-            return 
brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
-        }
-
-        Optional<DispatchRate> dispatchRate = 
getTopicPolicyDispatchRate(brokerService, topicName, type);
-        if (dispatchRate.isPresent()) {
-            return true;
-        }
-
-        policies = policies.isPresent() ? policies : 
getPolicies(brokerService, topicName);
-        return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
-    }
-
-    public static Optional<DispatchRate> 
getTopicPolicyDispatchRate(BrokerService brokerService,
-                                                                    String 
topicName, Type type) {
-        Optional<DispatchRate> dispatchRate = Optional.empty();
-        final ServiceConfiguration serviceConfiguration = 
brokerService.pulsar().getConfiguration();
-        if (serviceConfiguration.isSystemTopicEnabled() && 
serviceConfiguration.isTopicLevelPoliciesEnabled()) {
-            try {
-                switch (type) {
-                    case TOPIC:
-                        dispatchRate = 
Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                .map(TopicPolicies::getDispatchRate);
-                        break;
-                    case SUBSCRIPTION:
-                        dispatchRate = 
Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                
.map(TopicPolicies::getSubscriptionDispatchRate);
-                        break;
-                    case REPLICATOR:
-                        dispatchRate = 
Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                .map(TopicPolicies::getReplicatorDispatchRate);
-                        break;
-                    default:
-                        break;
-                }
-            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException 
e) {
-                log.debug("Topic {} policies have not been initialized yet.", 
topicName);
-            } catch (Exception e) {
-                log.debug("[{}] Failed to get topic dispatch rate. ", 
topicName, e);
-            }
-        }
-
-        return dispatchRate;
-    }
-
-    public static boolean isDispatchRateNeeded(final ServiceConfiguration 
serviceConfig,
-            final Optional<Policies> policies, final String topicName, final 
Type type) {
-        DispatchRate dispatchRate = 
getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
-        if (dispatchRate == null) {
-            switch (type) {
-                case TOPIC:
-                    return 
serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
-                        || 
serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
-                case SUBSCRIPTION:
-                    return 
serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
-                        || 
serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0;
-                case REPLICATOR:
-                    return 
serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0
-                        || 
serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0;
-                default:
-                    log.error("error DispatchRateLimiter type: {} ", type);
-                    return false;
-            }
-        }
-        return true;
-    }
-
     @SuppressWarnings("deprecation")
     public static DispatchRateImpl getPoliciesDispatchRate(final String 
cluster,
                                                            Optional<Policies> 
policies,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a7..932c446669d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -23,12 +23,10 @@ import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPay
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -39,7 +37,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.pulsar.broker.PulsarService;
@@ -50,11 +47,9 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import 
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
-import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
-import org.mockito.MockedStatic;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -94,17 +89,9 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
 
         subscriptionMock = mock(NonPersistentSubscription.class);
 
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = 
mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> 
DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            nonpersistentDispatcher = new 
NonPersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, subscriptionMock,
-                    new HashRangeAutoSplitStickyKeyConsumerSelector());
-        }
+        nonpersistentDispatcher = new 
NonPersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, subscriptionMock,
+            new HashRangeAutoSplitStickyKeyConsumerSelector());
     }
 
     @Test(timeOut = 10000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac4..6a44e3dfcf7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -26,13 +26,11 @@ import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anySet;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -45,7 +43,6 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -72,7 +69,6 @@ import 
org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.mockito.ArgumentCaptor;
-import org.mockito.MockedStatic;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -143,17 +139,9 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
 
         subscriptionMock = mock(PersistentSubscription.class);
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = 
mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> 
DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            persistentDispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, cursorMock, subscriptionMock, configMock,
-                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
-        }
+        persistentDispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, cursorMock, subscriptionMock, configMock,
+            new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
     }
 
     @Test
@@ -197,40 +185,31 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
     @Test(timeOut = 10000)
     public void testSendMessage() {
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = 
mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> 
DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            KeySharedMeta keySharedMeta = new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
-            DispatchRateLimiter.isDispatchRateNeeded(brokerMock, 
Optional.empty(), "hello", DispatchRateLimiter.Type.SUBSCRIPTION);
-            PersistentStickyKeyDispatcherMultipleConsumers 
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, cursorMock, subscriptionMock, configMock, 
keySharedMeta);
-            try {
-                keySharedMeta.addHashRange()
-                        .setStart(0)
-                        .setEnd(9);
-
-                Consumer consumerMock = mock(Consumer.class);
-                doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
-                persistentDispatcher.addConsumer(consumerMock);
-                persistentDispatcher.consumerFlow(consumerMock, 1000);
-            } catch (Exception e) {
-                fail("Failed to add mock consumer", e);
-            }
+        KeySharedMeta keySharedMeta = new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
+        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = 
new PersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, cursorMock, subscriptionMock, configMock, 
keySharedMeta);
+        try {
+            keySharedMeta.addHashRange()
+                .setStart(0)
+                .setEnd(9);
 
-            List<Entry> entries = new ArrayList<>();
-            entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
-            entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+            Consumer consumerMock = mock(Consumer.class);
+            doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.consumerFlow(consumerMock, 1000);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
 
-            try {
-                //Should success,see issue #8960
-                persistentDispatcher.readEntriesComplete(entries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
-            } catch (Exception e) {
-                fail("Failed to readEntriesComplete.", e);
-            }
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+        try {
+            //Should success,see issue #8960
+            persistentDispatcher.readEntriesComplete(entries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+        } catch (Exception e) {
+            fail("Failed to readEntriesComplete.", e);
         }
     }
 

Reply via email to