This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 679a3d49eef [improve] [broker] Check max producers/consumers
limitation first before other ops to save resources (#23074)
679a3d49eef is described below
commit 679a3d49eefc2a82bbeba085c258b1f2b751f28a
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 18:54:12 2024 +0800
[improve] [broker] Check max producers/consumers limitation first before
other ops to save resources (#23074)
---
.../pulsar/broker/service/AbstractTopic.java | 20 +++--
.../apache/pulsar/broker/service/ServerCnx.java | 19 +++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 77 +++++++++++++++----
.../pulsar/broker/service/PersistentTopicTest.java | 45 -----------
.../apache/pulsar/client/api/MaxProducerTest.java | 88 ++++++++++++++++++++++
5 files changed, 181 insertions(+), 68 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fbf11f1d0ad..f25dfef966b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -486,8 +486,18 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(),
config.getMaxPublishRatePerTopicInBytes());
}
+ public boolean isProducersExceeded(String producerName) {
+ String replicatorPrefix =
brokerService.getPulsar().getConfig().getReplicatorPrefix() + ".";
+ boolean isRemote = producerName.startsWith(replicatorPrefix);
+ return isProducersExceeded(isRemote);
+ }
+
protected boolean isProducersExceeded(Producer producer) {
- if (isSystemTopic() || producer.isRemote()) {
+ return isProducersExceeded(producer.isRemote());
+ }
+
+ protected boolean isProducersExceeded(boolean isRemote) {
+ if (isSystemTopic() || isRemote) {
return false;
}
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
@@ -536,7 +546,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return count;
}
- protected boolean isConsumersExceededOnTopic() {
+ public boolean isConsumersExceededOnTopic() {
if (isSystemTopic()) {
return false;
}
@@ -973,12 +983,6 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
- if (isProducersExceeded(producer)) {
- log.warn("[{}] Attempting to add producer to topic which reached
max producers limit", topic);
- return CompletableFuture.failedFuture(new
BrokerServiceException.ProducerBusyException(
- "Topic '" + topic + "' reached max producers limit"));
- }
-
if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached
max same address producers limit", topic);
return CompletableFuture.failedFuture(new
BrokerServiceException.ProducerBusyException(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6690ab4af5f..5df276e8f3d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1307,6 +1307,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
"Topic " + topicName + " does
not exist"));
}
final Topic topic = optTopic.get();
+ // Check max consumer limitation to avoid
unnecessary ops wasting resources. For example:
+ // the new consumer reached max producer
limitation, but pulsar did schema check first,
+ // it would waste CPU.
+ if (((AbstractTopic)
topic).isConsumersExceededOnTopic()) {
+ log.warn("[{}] Attempting to add consumer to
topic which reached max"
+ + " consumers limit", topic);
+ Throwable t =
+ new ConsumerBusyException("Topic
reached max consumers limit");
+ return FutureUtil.failedFuture(t);
+ }
return
service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowedAutoSubscriptionCreation -> {
boolean
rejectSubscriptionIfDoesNotExist = isDurable
@@ -1545,6 +1555,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
service.getOrCreateTopic(topicName.toString()).thenCompose((Topic
topic) -> {
+ // Check max producer limitation to avoid unnecessary ops
wasting resources. For example: the new
+ // producer reached max producer limitation, but pulsar did
schema check first, it would waste CPU
+ if (((AbstractTopic) topic).isProducersExceeded(producerName))
{
+ log.warn("[{}] Attempting to add producer to topic which
reached max producers limit", topic);
+ String errorMsg = "Topic '" + topicName.toString() + "'
reached max producers limit";
+ Throwable t = new
BrokerServiceException.ProducerBusyException(errorMsg);
+ return CompletableFuture.failedFuture(t);
+ }
+
// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
CompletableFuture<Void> backlogQuotaCheckFuture =
CompletableFuture.allOf(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 249dd3c4607..40e2ca8cce9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -52,6 +54,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.AllArgsConstructor;
@@ -70,6 +73,7 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -127,7 +131,13 @@ import
org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -2870,34 +2880,40 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace +
"/testMaxProducersPerTopicUnlimited";
+ admin.topics().createNonPartitionedTopic(topic);
+ AtomicInteger schemaOpsCounter =
injectSchemaCheckCounterForTopic(topic);
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
0);
- List<Producer<byte[]>> producers = new ArrayList<>();
+ List<Producer<String>> producers = new ArrayList<>();
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
}
+ assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
admin.namespaces().removeMaxProducersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
null);
+
try {
@Cleanup
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers
limit";
assertTrue(e.getMessage().contains(expectMsg));
+ assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
3);
// should success
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
+ assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
try {
@Cleanup
Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topic).create();
@@ -2905,14 +2921,39 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers
limit";
assertTrue(e.getMessage().contains(expectMsg));
+ assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
}
//clean up
- for (Producer<byte[]> tempProducer : producers) {
+ for (Producer<String> tempProducer : producers) {
tempProducer.close();
}
}
+ private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics =
+ WhiteboxImpl.getInternalState(pulsar.getBrokerService(),
"topics");
+ AbstractTopic topic = (AbstractTopic)
topics.get(topicName).join().get();
+ AbstractTopic spyTopic = Mockito.spy(topic);
+ AtomicInteger counter = new AtomicInteger();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ counter.incrementAndGet();
+ return invocation.callRealMethod();
+ }
+ }).when(spyTopic).addSchema(any(SchemaData.class));
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ counter.incrementAndGet();
+ return invocation.callRealMethod();
+ }
+
}).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class));
+ topics.put(topicName,
CompletableFuture.completedFuture(Optional.of(spyTopic)));
+ return counter;
+ }
+
@Test
public void testMaxConsumersPerTopicUnlimited() throws Exception {
restartClusterAfterTest();
@@ -2924,49 +2965,55 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace +
"/testMaxConsumersPerTopicUnlimited";
+ admin.topics().createNonPartitionedTopic(topic);
+ AtomicInteger schemaOpsCounter =
injectSchemaCheckCounterForTopic(topic);
assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) ==
0);
- List<Consumer<byte[]>> consumers = new ArrayList<>();
+ List<Consumer<String>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
- Consumer<byte[]> consumer =
-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
}
+ assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) ==
null);
try {
@Cleanup
- Consumer<byte[]> subscribe =
-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+ Consumer<String> subscribe =
pulsarClient.newConsumer(Schema.STRING)
+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers
limit"));
+ assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) ==
3);
// should success
- Consumer<byte[]> consumer =
-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
+ assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
try {
@Cleanup
- Consumer<byte[]> subscribe =
-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
+ Consumer<String> subscribe =
pulsarClient.newConsumer(Schema.STRING)
+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers
limit"));
+ assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
}
//clean up
- for (Consumer<byte[]> subConsumer : consumers) {
+ for (Consumer<String> subConsumer : consumers) {
subConsumer.close();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 76f871a6c60..8c21301c15b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -509,51 +509,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
topic.getProducers().values().forEach(producer ->
Assert.assertEquals(producer.getEpoch(), 3));
}
- private void testMaxProducers() {
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
- topic.initialize().join();
- String role = "appid1";
- // 1. add producer1
- Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name1", role,
- false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(), true);
- topic.addProducer(producer, new CompletableFuture<>());
- assertEquals(topic.getProducers().size(), 1);
-
- // 2. add producer2
- Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id
*/, "prod-name2", role,
- false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(), true);
- topic.addProducer(producer2, new CompletableFuture<>());
- assertEquals(topic.getProducers().size(), 2);
-
- // 3. add producer3 but reached maxProducersPerTopic
- try {
- Producer producer3 = new Producer(topic, serverCnx, 3 /* producer
id */, "prod-name3", role,
- false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(), true);
- topic.addProducer(producer3, new CompletableFuture<>()).join();
- fail("should have failed");
- } catch (Exception e) {
- assertEquals(e.getCause().getClass(),
BrokerServiceException.ProducerBusyException.class);
- }
- }
-
- @Test
- public void testMaxProducersForBroker() {
- // set max clients
- pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
- testMaxProducers();
- }
-
- @Test
- public void testMaxProducersForNamespace() throws Exception {
- // set max clients
- Policies policies = new Policies();
- policies.max_producers_per_topic = 2;
- pulsarTestContext.getPulsarResources().getNamespaceResources()
-
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
- policies);
- testMaxProducers();
- }
-
private Producer getMockedProducerWithSpecificAddress(Topic topic, long
producerId, InetAddress address) {
final String producerNameBase = "producer";
final String role = "appid1";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java
new file mode 100644
index 00000000000..a34b05280c4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class MaxProducerTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setMaxProducersPerTopic(2);
+ }
+
+ @Test
+ public void testMaxProducersForBroker() throws Exception {
+ testMaxProducers(2);
+ }
+
+ @Test
+ public void testMaxProducersForNamespace() throws Exception {
+ // set max clients
+ admin.namespaces().setMaxProducersPerTopic("public/default", 3);
+ testMaxProducers(3);
+ }
+
+ private void testMaxProducers(int maxProducerExpected) throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ List<org.apache.pulsar.client.api.Producer<byte[]>> producers = new
ArrayList<>();
+ for (int i = 0; i < maxProducerExpected; i++) {
+
producers.add(pulsarClient.newProducer().topic(topicName).create());
+ }
+
+ try {
+ pulsarClient.newProducer().topic(topicName).create();
+ fail("should have failed");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.ProducerBusyException);
+ }
+
+ // cleanup.
+ for (org.apache.pulsar.client.api.Producer p : producers) {
+ p.close();
+ }
+ admin.topics().delete(topicName, false);
+ }
+}