This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b219ccac334 [improve][broker] System topic writer/reader connection 
not counted. (#18369)
b219ccac334 is described below

commit b219ccac334c4b57df48d07e78ea381f54db1a7d
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Nov 23 14:23:57 2022 +0800

    [improve][broker] System topic writer/reader connection not counted. 
(#18369)
---
 .../pulsar/broker/service/AbstractTopic.java       | 24 ++++++++++++---
 .../broker/service/persistent/PersistentTopic.java |  3 ++
 .../pulsar/broker/service/ReplicatorTest.java      | 33 ++++++++++++++++++++
 .../systopic/PartitionedSystemTopicTest.java       | 36 +++++++++++++++++++++-
 4 files changed, 90 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 864fe7f5d65..75b15c15df2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -438,14 +438,21 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), 
config.getMaxPublishRatePerTopicInBytes());
     }
 
-    protected boolean isProducersExceeded() {
+    protected boolean isProducersExceeded(Producer producer) {
+        if (isSystemTopic() || producer.isRemote()) {
+            return false;
+        }
         Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
-        if (maxProducers > 0 && maxProducers <= producers.size()) {
+        if (maxProducers != null && maxProducers > 0 && maxProducers <= 
getUserCreatedProducersSize()) {
             return true;
         }
         return false;
     }
 
+    private long getUserCreatedProducersSize() {
+        return producers.values().stream().filter(p -> !p.isRemote()).count();
+    }
+
     protected void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
@@ -487,14 +494,21 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     protected boolean isConsumersExceededOnTopic() {
-        int maxConsumersPerTopic = 
topicPolicies.getMaxConsumerPerTopic().get();
-        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
getNumberOfConsumers()) {
+        if (isSystemTopic()) {
+            return false;
+        }
+        Integer maxConsumersPerTopic = 
topicPolicies.getMaxConsumerPerTopic().get();
+        if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0
+                && maxConsumersPerTopic <= getNumberOfConsumers()) {
             return true;
         }
         return false;
     }
 
     protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) 
{
+        if (isSystemTopic()) {
+            return false;
+        }
         final int maxSameAddressConsumers = 
brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressConsumersPerTopic();
 
@@ -951,7 +965,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     protected void internalAddProducer(Producer producer) throws 
BrokerServiceException {
-        if (isProducersExceeded()) {
+        if (isProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached 
max producers limit", topic);
             throw new BrokerServiceException.ProducerBusyException("Topic 
reached max producers limit");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e4bcb92b58d..ea20d413484 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3321,6 +3321,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private boolean checkMaxSubscriptionsPerTopicExceed(String 
subscriptionName) {
+        if (isSystemTopic()) {
+            return false;
+        }
         //Existing subscriptions are not affected
         if (StringUtils.isNotEmpty(subscriptionName) && 
getSubscription(subscriptionName) != null) {
             return false;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b8f8abc9a62..7f31ce39c96 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
@@ -1505,4 +1506,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
             assertTrue(topic.getReplicators().isEmpty());
         });
     }
+
+    @Test
+    public void testReplicatorProducerNotExceed() throws Exception {
+        log.info("--- testReplicatorProducerNotExceed ---");
+        String namespace1 = "pulsar/ns11";
+        admin1.namespaces().createNamespace(namespace1);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace1, 
Sets.newHashSet("r1", "r2"));
+        final TopicName dest1 = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace1 + 
"/testReplicatorProducerNotExceed1"));
+        String namespace2 = "pulsar/ns22";
+        admin2.namespaces().createNamespace(namespace2);
+        admin2.namespaces().setNamespaceReplicationClusters(namespace2, 
Sets.newHashSet("r1", "r2"));
+        final TopicName dest2 = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace1 + 
"/testReplicatorProducerNotExceed2"));
+        admin1.topics().createPartitionedTopic(dest1.toString(), 1);
+        admin1.topicPolicies().setMaxProducers(dest1.toString(), 1);
+        admin2.topics().createPartitionedTopic(dest2.toString(), 1);
+        admin2.topicPolicies().setMaxProducers(dest2.toString(), 1);
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest1);
+        log.info("--- Starting producer1 --- " + url1);
+
+        producer1.produce(1);
+
+        @Cleanup
+        MessageProducer producer2 = new MessageProducer(url2, dest2);
+        log.info("--- Starting producer2 --- " + url2);
+
+        producer2.produce(1);
+
+        Assert.assertThrows(PulsarClientException.ProducerBusyException.class, 
() -> new MessageProducer(url2, dest2));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 9ab32d5ffa7..e79197bb1b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
@@ -202,7 +203,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
     }
 
     @Test
-    private void testSetBacklogCausedCreatingProducerFailure() throws 
Exception {
+    public void testSetBacklogCausedCreatingProducerFailure() throws Exception 
{
         final String ns = "prop/ns-test";
         final String topic = ns + "/topic-1";
 
@@ -260,4 +261,37 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
             Assert.fail("failed to create producer");
         }
     }
+
+    @Test
+    public void testSystemTopicNotCheckExceed() throws Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+
+        admin.namespaces().setMaxConsumersPerTopic(ns, 1);
+        admin.topicPolicies().setMaxConsumers(topic, 1);
+        NamespaceEventsSystemTopicFactory systemTopicFactory = new 
NamespaceEventsSystemTopicFactory(pulsarClient);
+        TopicPoliciesSystemTopicClient systemTopicClientForNamespace = 
systemTopicFactory
+                .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
+        SystemTopicClient.Reader reader1 = 
systemTopicClientForNamespace.newReader();
+        SystemTopicClient.Reader reader2 = 
systemTopicClientForNamespace.newReader();
+
+        admin.topicPolicies().setMaxProducers(topic, 1);
+
+        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = 
systemTopicClientForNamespace.newWriterAsync();
+        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = 
systemTopicClientForNamespace.newWriterAsync();
+        CompletableFuture<Void> f1 = 
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
+
+        FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
+        Assert.assertTrue(reader1.hasMoreEvents());
+        Assert.assertNotNull(reader1.readNext());
+        Assert.assertTrue(reader2.hasMoreEvents());
+        Assert.assertNotNull(reader2.readNext());
+        reader1.close();
+        reader2.close();
+        writer1.get().close();
+        writer2.get().close();
+    }
 }

Reply via email to