This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cfd04798789a3f2d5b9bcce17513fc84c4cb3449 Author: Ruguo Yu <[email protected]> AuthorDate: Mon Nov 28 09:44:20 2022 +0800 [improve][broker] System topic writer/reader connection not counted (#18603) This PR is a supplement to #18369. - `AbstractTopic.isSameAddressProducersExceeded()` - `AbstractBaseDispatcher.isConsumersExceededOnSubscription()` --- .../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 6 +++++- .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++ .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 +++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index d6f441f02bf..677b3a84a4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -255,8 +255,12 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen protected abstract boolean isConsumersExceededOnSubscription(); protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) { + if (topic.isSystemTopic()) { + return false; + } Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get(); - return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize; + return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0 + && maxConsumersPerSubscription <= consumerSize; } private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f93e0b7e5cf..c1fcee4a059 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -453,6 +453,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP } protected boolean isSameAddressProducersExceeded(Producer producer) { + if (isSystemTopic() || producer.isRemote()) { + return false; + } final int maxSameAddressProducers = brokerService.pulsar().getConfiguration() .getMaxSameAddressProducersPerTopic(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 5af983ed3af..3cd25d3b00c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -267,6 +267,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { admin.namespaces().createNamespace(ns, 2); admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); + conf.setMaxSameAddressConsumersPerTopic(1); admin.namespaces().setMaxConsumersPerTopic(ns, 1); admin.topicPolicies().setMaxConsumers(topic, 1); NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); @@ -275,8 +276,9 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); + conf.setMaxSameAddressProducersPerTopic(1); + admin.namespaces().setMaxProducersPerTopic(ns, 1); admin.topicPolicies().setMaxProducers(topic, 1); - CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync(); CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync(); CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
