This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b33bff98d0d [improve][broker] System topic writer/reader connection
not counted (#18603)
b33bff98d0d is described below
commit b33bff98d0da37cd7c194aa2264a13ebceb802bc
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800
[improve][broker] System topic writer/reader connection not counted (#18603)
This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
---
.../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 6 +++++-
.../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 +++-
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3d3f3db970c..c9c0300da05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -288,8 +288,12 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
protected abstract boolean isConsumersExceededOnSubscription();
protected boolean isConsumersExceededOnSubscription(AbstractTopic topic,
int consumerSize) {
+ if (topic.isSystemTopic()) {
+ return false;
+ }
Integer maxConsumersPerSubscription =
topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
- return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription
<= consumerSize;
+ return maxConsumersPerSubscription != null &&
maxConsumersPerSubscription > 0
+ && maxConsumersPerSubscription <= consumerSize;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos,
ByteBuf headersAndPayload) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d0451d5a5fe..8b85615efc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -452,6 +452,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
protected boolean isSameAddressProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
final int maxSameAddressProducers =
brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 89834d300f6..e936b974e76 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -244,6 +244,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+ conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new
NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -252,8 +253,9 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
SystemTopicClient.Reader reader1 =
systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 =
systemTopicClientForNamespace.newReader();
+ conf.setMaxSameAddressProducersPerTopic(1);
+ admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);
-
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 =
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);