This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new a07a025124c [fix][broker]fix the publish latency spike issue with 
large number of producers (#20607)
a07a025124c is described below

commit a07a025124c75106df62998df92e3fc79e15dc9a
Author: Penghui Li <[email protected]>
AuthorDate: Tue Jun 20 10:51:50 2023 +0800

    [fix][broker]fix the publish latency spike issue with large number of 
producers (#20607)
---
 .../pulsar/broker/service/AbstractTopic.java       | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fa27f87f99d..13f5f8fa4b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -126,7 +127,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private static final AtomicLongFieldUpdater<AbstractTopic> 
RATE_LIMITED_UPDATER =
             AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, 
"publishRateLimitedTimes");
-    protected volatile long publishRateLimitedTimes = 0;
+    protected volatile long publishRateLimitedTimes = 0L;
+
+    private static final AtomicIntegerFieldUpdater<AbstractTopic> 
USER_CREATED_PRODUCER_COUNTER_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, 
"userCreatedProducerCount");
+    private volatile int userCreatedProducerCount = 0;
 
     protected volatile Optional<Long> topicEpoch = Optional.empty();
     private volatile boolean hasExclusiveProducer;
@@ -426,14 +431,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             return false;
         }
         Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
-        if (maxProducers != null && maxProducers > 0 && maxProducers <= 
getUserCreatedProducersSize()) {
-            return true;
-        }
-        return false;
-    }
-
-    private long getUserCreatedProducersSize() {
-        return producers.values().stream().filter(p -> !p.isRemote()).count();
+        return maxProducers != null && maxProducers > 0
+                && maxProducers <= 
USER_CREATED_PRODUCER_COUNTER_UPDATER.get(this);
     }
 
     protected void registerTopicPolicyListener() {
@@ -974,6 +973,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         Producer existProducer = 
producers.putIfAbsent(producer.getProducerName(), producer);
         if (existProducer != null) {
             tryOverwriteOldProducer(existProducer, producer);
+        } else if (!producer.isRemote()) {
+            USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
         }
     }
 
@@ -1006,6 +1007,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         checkArgument(producer.getTopic() == this);
 
         if (producers.remove(producer.getProducerName(), producer)) {
+            if (!producer.isRemote()) {
+                USER_CREATED_PRODUCER_COUNTER_UPDATER.decrementAndGet(this);
+            }
             handleProducerRemoved(producer);
         }
     }

Reply via email to