This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a070c33  Move duplicate code to abstract parent class (#10061)
a070c33 is described below

commit a070c3363d007198f5dee4d3d70d075ba6a1c8df
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 29 10:55:37 2021 +0800

    Move duplicate code to abstract parent class (#10061)
    
    * Move duplicate code to abstract parent class
    
    * code style
---
 .../broker/service/AbstractBaseDispatcher.java     | 39 ++++++++++++++++++++++
 .../AbstractDispatcherSingleActiveConsumer.java    |  2 --
 .../NonPersistentDispatcherMultipleConsumers.java  |  3 +-
 ...onPersistentDispatcherSingleActiveConsumer.java | 38 ++-------------------
 .../PersistentDispatcherMultipleConsumers.java     | 38 ++-------------------
 .../PersistentDispatcherSingleActiveConsumer.java  | 35 ++-----------------
 6 files changed, 48 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 02305e4..d57fc7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,20 +19,26 @@
 
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import io.netty.buffer.ByteBuf;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 
@@ -148,6 +154,39 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
     }
 
+    /**
+     * Determine whether the number of consumers on the subscription reaches 
the threshold.
+     * @return
+     */
+    protected abstract boolean isConsumersExceededOnSubscription();
+
+    protected boolean isConsumersExceededOnSubscription(BrokerService 
brokerService,
+                                                        String topic, int 
consumerSize) {
+        Policies policies = null;
+        Integer maxConsumersPerSubscription = null;
+        try {
+            maxConsumersPerSubscription = Optional.ofNullable(brokerService
+                    .getTopicPolicies(TopicName.get(topic)))
+                    .map(TopicPolicies::getMaxConsumersPerSubscription)
+                    .orElse(null);
+            if (maxConsumersPerSubscription == null) {
+                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks in addConsumer
+                policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+            }
+        } catch (Exception e) {
+            log.debug("Get topic or namespace policies fail", e);
+        }
+
+        if (maxConsumersPerSubscription == null) {
+            maxConsumersPerSubscription = policies != null && 
policies.max_consumers_per_subscription > 0
+                    ? policies.max_consumers_per_subscription :
+                    
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
+        }
+
+        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription 
<= consumerSize;
+    }
+
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, 
ByteBuf headersAndPayload) {
         // Remove the protobuf headers
         Commands.skipMessageMetadata(headersAndPayload);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 9eb5a2e..b142c51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -70,8 +70,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
 
     protected abstract void cancelPendingRead();
 
-    protected abstract boolean isConsumersExceededOnSubscription();
-
     protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
         if (null != activeConsumer && subscriptionType == SubType.Failover) {
             consumers.forEach(consumer ->
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 00584cf..b6f4ca7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -86,7 +86,8 @@ public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcher
         consumerSet.add(consumer);
     }
 
-    private boolean isConsumersExceededOnSubscription() {
+    @Override
+    protected boolean isConsumersExceededOnSubscription() {
         final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
             return true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index cecdbaf..a720767 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import java.util.List;
-import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -33,9 +30,6 @@ import 
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 
@@ -79,37 +73,9 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         }
     }
 
+    @Override
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = 
Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topicName)))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks in addConsumer
-                policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic.getName()).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = 
policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription :
-                    serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
-            return true;
-        }
-        return false;
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumers.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index eec0f93..2c05644 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
@@ -40,7 +39,6 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -59,10 +57,8 @@ import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSeal
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -158,37 +154,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         consumerSet.add(consumer);
     }
 
-    private boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = 
Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topic.getName())))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and
-                // prevent deadlocks in addConsumer
-                policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic.getName()).getNamespace()));
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = 
policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription : 
serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
-            return true;
-        }
-        return false;
+    @Override
+    protected boolean isConsumersExceededOnSubscription() {
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumerList.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6e28a3c..93c2d25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import java.util.Iterator;
 import java.util.List;
@@ -36,7 +35,6 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -50,10 +48,8 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,36 +128,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
     }
 
+    @Override
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = 
Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topicName)))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks in addConsumer
-                policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic.getName()).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = 
policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription : 
serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
-            return true;
-        }
-        return false;
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumers.size());
     }
 
     @Override

Reply via email to