This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 9ca74fcc844 [improve][broker] System topic writer/reader connection
not counted (#18603)
9ca74fcc844 is described below
commit 9ca74fcc844aeb51be25d5b64a2d924fee0f6123
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800
[improve][broker] System topic writer/reader connection not counted (#18603)
This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
(cherry picked from commit a2fb5622a11a82bc867083d9b3411567dacf369b)
---
.../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 9 ++++++---
.../java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../nonpersistent/NonPersistentDispatcherMultipleConsumers.java | 2 +-
.../NonPersistentDispatcherSingleActiveConsumer.java | 2 +-
.../persistent/PersistentDispatcherMultipleConsumers.java | 2 +-
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +-
.../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 +++-
7 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3f5932a2d34..1e3b04da37a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -202,18 +202,21 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
protected abstract boolean isConsumersExceededOnSubscription();
protected boolean isConsumersExceededOnSubscription(BrokerService
brokerService,
- String topic, int
consumerSize) {
+ AbstractTopic topic,
int consumerSize) {
+ if (topic.isSystemTopic()) {
+ return false;
+ }
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = brokerService
- .getTopicPolicies(TopicName.get(topic))
+ .getTopicPolicies(TopicName.get(topic.getName()))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks in addConsumer
policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()).orElse(null);
+
.getPoliciesIfCached(TopicName.get(topic.getName()).getNamespaceObject()).orElse(null);
}
} catch (Exception e) {
log.debug("Get topic or namespace policies fail", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index eb06a41d42f..00c7a494ed3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -189,6 +189,9 @@ public abstract class AbstractTopic implements Topic {
}
protected boolean isSameAddressProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
final int maxSameAddressProducers =
brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 56886833592..185912dda1e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -85,7 +85,7 @@ public class NonPersistentDispatcherMultipleConsumers extends
AbstractDispatcher
@Override
protected boolean isConsumersExceededOnSubscription() {
- return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic.getName(), consumerList.size());
+ return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic, consumerList.size());
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5cdbff17b81..5579e4dd154 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -73,7 +73,7 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
@Override
protected boolean isConsumersExceededOnSubscription() {
- return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic.getName(), consumers.size());
+ return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic, consumers.size());
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02fc8050763..bcfab873669 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -168,7 +168,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
@Override
protected boolean isConsumersExceededOnSubscription() {
- return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic.getName(), consumerList.size());
+ return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic, consumerList.size());
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 727c4f09af9..9a3c9fa3485 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -131,7 +131,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
protected boolean isConsumersExceededOnSubscription() {
- return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic.getName(), consumers.size());
+ return isConsumersExceededOnSubscription(topic.getBrokerService(),
topic, consumers.size());
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ce4f44dd339..3a6529f8964 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -244,6 +244,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+ conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new
NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -252,8 +253,9 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
SystemTopicClient.Reader reader1 =
systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 =
systemTopicClientForNamespace.newReader();
+ conf.setMaxSameAddressProducersPerTopic(1);
+ admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);
-
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 =
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);