This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 40b3e4e44ba [improve][broker] Reduce the re-schedule message read 
operation for PersistentDispatcherMultipleConsumers (#16241)
40b3e4e44ba is described below

commit 40b3e4e44ba3d799dc3d78b7d1be41b71aa5cbb2
Author: lipenghui <[email protected]>
AuthorDate: Tue Jun 28 11:29:57 2022 +0800

    [improve][broker] Reduce the re-schedule message read operation for 
PersistentDispatcherMultipleConsumers (#16241)
    
    Fix the CPU consumption while having many consumers (> 100k) and enabled 
dispatch rate limit.
    
    
![image](https://user-images.githubusercontent.com/12592133/175940861-7be13d62-042d-46b9-923d-3b1e8354d331.png)
    
    
[broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt)
    
    - Added `isRescheduleReadInProgress` to ensure the dispatcher only has one 
pending re-schedule read task at a time.
    - Added DEBUG log for the re-schedule read operation
    
    (cherry picked from commit eec46ddcba4d2b4f956e1b4d63154cc43087f507)
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java      | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02fc8050763..f28c6c8367f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -106,6 +107,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     "blockedDispatcherOnUnackedMsgs");
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
+    private AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
+
     protected enum ReadType {
         Normal, Replay
     }

Reply via email to