This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 679a3d49eef [improve] [broker] Check max producers/consumers 
limitation first before other ops to save resources (#23074)
679a3d49eef is described below

commit 679a3d49eefc2a82bbeba085c258b1f2b751f28a
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 18:54:12 2024 +0800

    [improve] [broker] Check max producers/consumers limitation first before 
other ops to save resources (#23074)
---
 .../pulsar/broker/service/AbstractTopic.java       | 20 +++--
 .../apache/pulsar/broker/service/ServerCnx.java    | 19 +++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 77 +++++++++++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 45 -----------
 .../apache/pulsar/client/api/MaxProducerTest.java  | 88 ++++++++++++++++++++++
 5 files changed, 181 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fbf11f1d0ad..f25dfef966b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -486,8 +486,18 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), 
config.getMaxPublishRatePerTopicInBytes());
     }
 
+    public boolean isProducersExceeded(String producerName) {
+        String replicatorPrefix = 
brokerService.getPulsar().getConfig().getReplicatorPrefix() + ".";
+        boolean isRemote = producerName.startsWith(replicatorPrefix);
+        return isProducersExceeded(isRemote);
+    }
+
     protected boolean isProducersExceeded(Producer producer) {
-        if (isSystemTopic() || producer.isRemote()) {
+        return isProducersExceeded(producer.isRemote());
+    }
+
+    protected boolean isProducersExceeded(boolean isRemote) {
+        if (isSystemTopic() || isRemote) {
             return false;
         }
         Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
@@ -536,7 +546,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return count;
     }
 
-    protected boolean isConsumersExceededOnTopic() {
+    public boolean isConsumersExceededOnTopic() {
         if (isSystemTopic()) {
             return false;
         }
@@ -973,12 +983,6 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     protected CompletableFuture<Void> internalAddProducer(Producer producer) {
-        if (isProducersExceeded(producer)) {
-            log.warn("[{}] Attempting to add producer to topic which reached 
max producers limit", topic);
-            return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
-                    "Topic '" + topic + "' reached max producers limit"));
-        }
-
         if (isSameAddressProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached 
max same address producers limit", topic);
             return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6690ab4af5f..5df276e8f3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1307,6 +1307,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                                 "Topic " + topicName + " does 
not exist"));
                             }
                             final Topic topic = optTopic.get();
+                            // Check max consumer limitation to avoid 
unnecessary ops wasting resources. For example:
+                            // the new consumer reached max producer 
limitation, but pulsar did schema check first,
+                            // it would waste CPU.
+                            if (((AbstractTopic) 
topic).isConsumersExceededOnTopic()) {
+                                log.warn("[{}] Attempting to add consumer to 
topic which reached max"
+                                        + " consumers limit", topic);
+                                Throwable t =
+                                        new ConsumerBusyException("Topic 
reached max consumers limit");
+                                return FutureUtil.failedFuture(t);
+                            }
                             return 
service.isAllowAutoSubscriptionCreationAsync(topicName)
                                     
.thenCompose(isAllowedAutoSubscriptionCreation -> {
                                         boolean 
rejectSubscriptionIfDoesNotExist = isDurable
@@ -1545,6 +1555,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
 
             service.getOrCreateTopic(topicName.toString()).thenCompose((Topic 
topic) -> {
+                // Check max producer limitation to avoid unnecessary ops 
wasting resources. For example: the new
+                // producer reached max producer limitation, but pulsar did 
schema check first, it would waste CPU
+                if (((AbstractTopic) topic).isProducersExceeded(producerName)) 
{
+                    log.warn("[{}] Attempting to add producer to topic which 
reached max producers limit", topic);
+                    String errorMsg = "Topic '" + topicName.toString() + "' 
reached max producers limit";
+                    Throwable t = new 
BrokerServiceException.ProducerBusyException(errorMsg);
+                    return CompletableFuture.failedFuture(t);
+                }
+
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
                 CompletableFuture<Void> backlogQuotaCheckFuture = 
CompletableFuture.allOf(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 249dd3c4607..40e2ca8cce9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -52,6 +54,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.ws.rs.NotAcceptableException;
 import javax.ws.rs.core.Response.Status;
 import lombok.AllArgsConstructor;
@@ -70,6 +73,7 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -127,7 +131,13 @@ import 
org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -2870,34 +2880,40 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         final String myNamespace = newUniqueName(defaultTenant + "/ns");
         admin.namespaces().createNamespace(myNamespace, Set.of("test"));
         final String topic = "persistent://" + myNamespace + 
"/testMaxProducersPerTopicUnlimited";
+        admin.topics().createNonPartitionedTopic(topic);
+        AtomicInteger schemaOpsCounter = 
injectSchemaCheckCounterForTopic(topic);
         //the policy is set to 0, so there will be no restrictions
         admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 
0);
-        List<Producer<byte[]>> producers = new ArrayList<>();
+        List<Producer<String>> producers = new ArrayList<>();
         for (int i = 0; i < maxProducersPerTopic + 1; i++) {
-            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+            Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
             producers.add(producer);
         }
+        assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
 
         admin.namespaces().removeMaxProducersPerTopic(myNamespace);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 
null);
+
         try {
             @Cleanup
-            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+            Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
             fail("should fail");
         } catch (PulsarClientException e) {
             String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
             assertTrue(e.getMessage().contains(expectMsg));
+            assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
         }
         //set the limit to 3
         admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 
3);
         // should success
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
         producers.add(producer);
+        assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
         try {
             @Cleanup
             Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topic).create();
@@ -2905,14 +2921,39 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         } catch (PulsarClientException e) {
             String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
             assertTrue(e.getMessage().contains(expectMsg));
+            assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
         }
 
         //clean up
-        for (Producer<byte[]> tempProducer : producers) {
+        for (Producer<String> tempProducer : producers) {
             tempProducer.close();
         }
     }
 
+    private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics =
+                WhiteboxImpl.getInternalState(pulsar.getBrokerService(), 
"topics");
+        AbstractTopic topic = (AbstractTopic) 
topics.get(topicName).join().get();
+        AbstractTopic spyTopic = Mockito.spy(topic);
+        AtomicInteger counter = new AtomicInteger();
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                counter.incrementAndGet();
+                return invocation.callRealMethod();
+            }
+        }).when(spyTopic).addSchema(any(SchemaData.class));
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                counter.incrementAndGet();
+                return invocation.callRealMethod();
+            }
+        
}).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class));
+        topics.put(topicName, 
CompletableFuture.completedFuture(Optional.of(spyTopic)));
+        return counter;
+    }
+
     @Test
     public void testMaxConsumersPerTopicUnlimited() throws Exception {
         restartClusterAfterTest();
@@ -2924,49 +2965,55 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         final String myNamespace = newUniqueName(defaultTenant + "/ns");
         admin.namespaces().createNamespace(myNamespace, Set.of("test"));
         final String topic = "persistent://" + myNamespace + 
"/testMaxConsumersPerTopicUnlimited";
+        admin.topics().createNonPartitionedTopic(topic);
+        AtomicInteger schemaOpsCounter = 
injectSchemaCheckCounterForTopic(topic);
 
         assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
         //the policy is set to 0, so there will be no restrictions
         admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 
0);
-        List<Consumer<byte[]>> consumers = new ArrayList<>();
+        List<Consumer<String>> consumers = new ArrayList<>();
         for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
-            Consumer<byte[]> consumer =
-                    
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                    
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
             consumers.add(consumer);
         }
+        assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
 
         admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 
null);
         try {
             @Cleanup
-            Consumer<byte[]> subscribe =
-                    
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+            Consumer<String> subscribe = 
pulsarClient.newConsumer(Schema.STRING)
+                    
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
             fail("should fail");
         } catch (PulsarClientException e) {
             assertTrue(e.getMessage().contains("Topic reached max consumers 
limit"));
+            assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
         }
         //set the limit to 3
         admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
         Awaitility.await().until(()
                 -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 
3);
         // should success
-        Consumer<byte[]> consumer =
-                
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
         consumers.add(consumer);
+        assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
         try {
             @Cleanup
-            Consumer<byte[]> subscribe =
-                    
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+            Consumer<String> subscribe = 
pulsarClient.newConsumer(Schema.STRING)
+                    
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
             fail("should fail");
         } catch (PulsarClientException e) {
             assertTrue(e.getMessage().contains("Topic reached max consumers 
limit"));
+            assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
         }
 
         //clean up
-        for (Consumer<byte[]> subConsumer : consumers) {
+        for (Consumer<String> subConsumer : consumers) {
             subConsumer.close();
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 76f871a6c60..8c21301c15b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -509,51 +509,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         topic.getProducers().values().forEach(producer -> 
Assert.assertEquals(producer.getEpoch(), 3));
     }
 
-    private void testMaxProducers() {
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
-        topic.initialize().join();
-        String role = "appid1";
-        // 1. add producer1
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name1", role,
-                false, null, SchemaVersion.Latest, 0, false, 
ProducerAccessMode.Shared, Optional.empty(), true);
-        topic.addProducer(producer, new CompletableFuture<>());
-        assertEquals(topic.getProducers().size(), 1);
-
-        // 2. add producer2
-        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id 
*/, "prod-name2", role,
-                false, null, SchemaVersion.Latest, 0, false, 
ProducerAccessMode.Shared, Optional.empty(), true);
-        topic.addProducer(producer2, new CompletableFuture<>());
-        assertEquals(topic.getProducers().size(), 2);
-
-        // 3. add producer3 but reached maxProducersPerTopic
-        try {
-            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer 
id */, "prod-name3", role,
-                    false, null, SchemaVersion.Latest, 0, false, 
ProducerAccessMode.Shared, Optional.empty(), true);
-            topic.addProducer(producer3, new CompletableFuture<>()).join();
-            fail("should have failed");
-        } catch (Exception e) {
-            assertEquals(e.getCause().getClass(), 
BrokerServiceException.ProducerBusyException.class);
-        }
-    }
-
-    @Test
-    public void testMaxProducersForBroker() {
-        // set max clients
-        pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
-        testMaxProducers();
-    }
-
-    @Test
-    public void testMaxProducersForNamespace() throws Exception {
-        // set max clients
-        Policies policies = new Policies();
-        policies.max_producers_per_topic = 2;
-        pulsarTestContext.getPulsarResources().getNamespaceResources()
-                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
-                        policies);
-        testMaxProducers();
-    }
-
     private Producer getMockedProducerWithSpecificAddress(Topic topic, long 
producerId, InetAddress address) {
         final String producerNameBase = "producer";
         final String role = "appid1";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java
new file mode 100644
index 00000000000..a34b05280c4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class MaxProducerTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setMaxProducersPerTopic(2);
+    }
+
+    @Test
+    public void testMaxProducersForBroker() throws Exception {
+        testMaxProducers(2);
+    }
+
+    @Test
+    public void testMaxProducersForNamespace() throws Exception {
+        // set max clients
+        admin.namespaces().setMaxProducersPerTopic("public/default", 3);
+        testMaxProducers(3);
+    }
+
+    private void testMaxProducers(int maxProducerExpected) throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        List<org.apache.pulsar.client.api.Producer<byte[]>> producers = new 
ArrayList<>();
+        for (int i = 0; i < maxProducerExpected; i++) {
+            
producers.add(pulsarClient.newProducer().topic(topicName).create());
+        }
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("should have failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.ProducerBusyException);
+        }
+
+        // cleanup.
+        for (org.apache.pulsar.client.api.Producer p : producers) {
+            p.close();
+        }
+        admin.topics().delete(topicName, false);
+    }
+}

Reply via email to