This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a2fb5622a11 [improve][broker] System topic writer/reader connection
not counted (#18603)
a2fb5622a11 is described below
commit a2fb5622a11a82bc867083d9b3411567dacf369b
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800
[improve][broker] System topic writer/reader connection not counted (#18603)
This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
---
.../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 6 +++++-
.../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 +++-
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 02400f6cdee..a7a39e2ff0d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -283,8 +283,12 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
protected abstract boolean isConsumersExceededOnSubscription();
protected boolean isConsumersExceededOnSubscription(AbstractTopic topic,
int consumerSize) {
+ if (topic.isSystemTopic()) {
+ return false;
+ }
Integer maxConsumersPerSubscription =
topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
- return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription
<= consumerSize;
+ return maxConsumersPerSubscription != null &&
maxConsumersPerSubscription > 0
+ && maxConsumersPerSubscription <= consumerSize;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos,
ByteBuf headersAndPayload) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 75b15c15df2..02e9adcca37 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -470,6 +470,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
protected boolean isSameAddressProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
final int maxSameAddressProducers =
brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 5f1471e0a63..f82badac51e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -272,6 +272,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+ conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new
NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -280,8 +281,9 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
SystemTopicClient.Reader reader1 =
systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 =
systemTopicClientForNamespace.newReader();
+ conf.setMaxSameAddressProducersPerTopic(1);
+ admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);
-
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 =
systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 =
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);