This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e83c26efcfce5d0d10464a281240feb3e30ab8a2 Author: lipenghui <[email protected]> AuthorDate: Tue Jun 28 11:29:57 2022 +0800 [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241) ### Motivation Fix the CPU consumption while having many consumers (> 100k) and enabled dispatch rate limit.  [broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt) ### Modification - Added `isRescheduleReadInProgress` to ensure the dispatcher only has one pending re-schedule read task at a time. - Added DEBUG log for the re-schedule read operation (cherry picked from commit eec46ddcba4d2b4f956e1b4d63154cc43087f507) --- .../PersistentDispatcherMultipleConsumers.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0a6cf8e02a9..f77a55338f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -105,6 +106,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul "blockedDispatcherOnUnackedMsgs"); protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); + private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); + protected enum ReadType { Normal, Replay } @@ -290,8 +293,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul @Override protected void reScheduleRead() { - topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, - TimeUnit.MILLISECONDS); + if (isRescheduleReadInProgress.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS); + } + topic.getBrokerService().executor().schedule( + () -> { + isRescheduleReadInProgress.set(false); + readMoreEntries(); + }, + MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); + } } // left pair is messagesToRead, right pair is bytesToRead
