This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c2bb4dfbd [improve][broker] Choose random thread for consumerFlow in 
PersistentDispatcherSingleActiveConsumer (#20522)
51c2bb4dfbd is described below

commit 51c2bb4dfbd088810a76963ba86afeec2b193776
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Jun 12 18:42:49 2023 +0800

    [improve][broker] Choose random thread for consumerFlow in 
PersistentDispatcherSingleActiveConsumer (#20522)
    
    ### Motivation
    
    Currently, all subscriptions of one topic will do `consuemrFlow` action in 
a single thread, which is chosen by topicName:
    ```
    this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
    ```
    
    If there is a large number of subscriptions in a topic,  all the work will 
focus on one thread ---- the chosen thread, which will reduce the consume 
performance.  So this this patch , I'd like to choose a ramdom thread for 
`consumerFlow` in `PersistentDispatcherSingleActiveConsumer` to improve the 
consume performance.
    
    ### Modifications
    
    *  
`topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);` 
-> `topic.getBrokerService().getTopicOrderedExecutor().chooseThread();`
    *  `this.topicExecutor ` -> `this.executor`
---
 .../PersistentDispatcherSingleActiveConsumer.java        | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7cbf7bd2c78..d9d0f6adc87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -61,7 +61,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
 
     private final AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     protected final PersistentTopic topic;
-    protected final Executor topicExecutor;
+    protected final Executor executor;
 
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
@@ -79,7 +79,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
                 topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
-        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
+        this.executor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
@@ -148,7 +148,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesComplete(final List<Entry> entries, Object obj) {
-        topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
+        executor.execute(() -> internalReadEntriesComplete(entries, obj));
     }
 
     public synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
@@ -226,7 +226,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                             sendMessageInfo.getTotalMessages(), 
sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the 
previous batch has been written to the socket.
-                    topicExecutor.execute(() -> {
+                    executor.execute(() -> {
                             synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
                                 Consumer newConsumer = getActiveConsumer();
                                 readMoreEntries(newConsumer);
@@ -238,7 +238,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
-        topicExecutor.execute(() -> internalConsumerFlow(consumer));
+        executor.execute(() -> internalConsumerFlow(consumer));
     }
 
     private synchronized void internalConsumerFlow(Consumer consumer) {
@@ -267,7 +267,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
-        topicExecutor.execute(() -> 
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
+        executor.execute(() -> 
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
@@ -459,7 +459,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
+        executor.execute(() -> internalReadEntriesFailed(exception, ctx));
     }
 
     private synchronized void internalReadEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
@@ -507,7 +507,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         topic.getBrokerService().executor().schedule(() -> {
 
             // Jump again into dispatcher dedicated thread
-            topicExecutor.execute(() -> {
+            executor.execute(() -> {
                 synchronized (PersistentDispatcherSingleActiveConsumer.this) {
                     Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
                     // we should retry the read if we have an active consumer 
and there is no pending read

Reply via email to