This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63e452b6f7b [improve][broker] Pin executor to dispatcher and  
StreamingEntryReader to cache the string hashing (#18566)
63e452b6f7b is described below

commit 63e452b6f7be954883625426623af7d19e7d8d89
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Nov 28 10:07:12 2022 +0800

    [improve][broker] Pin executor to dispatcher and  StreamingEntryReader to 
cache the string hashing (#18566)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 18 ++++++++------
 ...istentStreamingDispatcherMultipleConsumers.java |  5 +++-
 ...entStreamingDispatcherSingleActiveConsumer.java |  4 ++--
 .../streamingdispatch/StreamingEntryReader.java    | 28 ++++++++++++++--------
 4 files changed, 35 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0421076ddf3..97cb5c36809 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +62,8 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
 
     private final AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     protected final PersistentTopic topic;
+    protected final Executor topicExecutor;
+    protected final Executor dispatcherExecutor;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
@@ -77,8 +80,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
                 topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
+        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
+        this.dispatcherExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(name);
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.readFailureBackoff = new 
Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
             TimeUnit.MILLISECONDS, 
serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
@@ -145,7 +150,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesComplete(final List<Entry> entries, Object obj) {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+        topicExecutor.execute(SafeRun.safeRun(() -> {
             internalReadEntriesComplete(entries, obj);
         }));
     }
@@ -225,8 +230,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                             sendMessageInfo.getTotalMessages(), 
sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the 
previous batch has been written to the socket.
-                    
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
-                        SafeRun.safeRun(() -> {
+                    topicExecutor.execute(SafeRun.safeRun(() -> {
                             synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
                                 Consumer newConsumer = getActiveConsumer();
                                 readMoreEntries(newConsumer);
@@ -238,7 +242,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+        topicExecutor.execute(SafeRun.safeRun(() -> {
             internalConsumerFlow(consumer);
         }));
     }
@@ -269,7 +273,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+        topicExecutor.execute(SafeRun.safeRun(() -> {
             internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
         }));
     }
@@ -463,7 +467,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+        topicExecutor.execute(SafeRun.safeRun(() -> {
             internalReadEntriesFailed(exception, ctx);
         }));
     }
@@ -513,7 +517,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         topic.getBrokerService().executor().schedule(() -> {
 
             // Jump again into dispatcher dedicated thread
-            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+            topicExecutor.execute(SafeRun.safeRun(() -> {
                 synchronized (PersistentDispatcherSingleActiveConsumer.this) {
                     Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
                     // we should retry the read if we have an active consumer 
and there is no pending read
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index dbb58e0540a..5df1fc2c6db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
@@ -47,10 +48,12 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
     private int sendingTaskCounter = 0;
     private final StreamingEntryReader streamingEntryReader = new 
StreamingEntryReader((ManagedCursorImpl) cursor,
             this, topic);
+    private final Executor topicExecutor;
 
     public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic 
topic, ManagedCursor cursor,
                                                           Subscription 
subscription) {
         super(topic, cursor, subscription);
+        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
     }
 
     /**
@@ -126,7 +129,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
     public void canReadMoreEntries(boolean withBackoff) {
         havePendingRead = false;
         topic.getBrokerService().executor().schedule(() -> {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
 SafeRun.safeRun(() -> {
+            topicExecutor.execute(SafeRun.safeRun(() -> {
                 synchronized 
(PersistentStreamingDispatcherMultipleConsumers.this) {
                     if (!havePendingRead) {
                         log.info("[{}] Scheduling read operation", name);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 6a2ea7564d5..5658771140e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -64,7 +64,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
     public void canReadMoreEntries(boolean withBackoff) {
         havePendingRead = false;
         topic.getBrokerService().executor().schedule(() -> {
-            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+             topicExecutor.execute(SafeRun.safeRun(() -> {
                 synchronized 
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
                     Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
                     if (currentConsumer != null && !havePendingRead) {
@@ -111,7 +111,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
      */
     @Override
     public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name, 
safeRun(() -> {
+        dispatcherExecutor.execute(safeRun(() -> {
             internalReadEntryComplete(entry, ctx);
         }));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index ca23eb4c482..0f75538ee88 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -44,7 +44,6 @@ import org.apache.pulsar.client.impl.Backoff;
  * Entry reader that fulfill read request by streamline the read instead of 
reading with micro batch.
  */
 @Slf4j
-@RequiredArgsConstructor
 public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, 
WaitingEntryCallBack {
 
     private final int maxRetry = 3;
@@ -61,6 +60,10 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
 
     private final PersistentTopic topic;
 
+    private final Executor topicExecutor;
+
+    private final Executor dispatcherExecutor;
+
     private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
 
     private volatile State state;
@@ -73,6 +76,14 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     private final Backoff readFailureBackoff = new Backoff(10, 
TimeUnit.MILLISECONDS,
             1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
 
+    public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher 
dispatcher, PersistentTopic topic) {
+        this.cursor = cursor;
+        this.dispatcher = dispatcher;
+        this.topic = topic;
+        this.topicExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
+        this.dispatcherExecutor = 
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(dispatcher.getName());
+    }
+
     /**
      * Read entries in streaming way, that said instead of reading with micro 
batch and send entries to consumer after
      * all entries in the batch are read from ledger, this method will fire 
numEntriesToRead requests to managedLedger
@@ -146,7 +157,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     @Override
     public void readEntryComplete(Entry entry, Object ctx) {
         // Don't block caller thread, complete read entry with dispatcher 
dedicated thread.
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
+        dispatcherExecutor.execute(SafeRun.safeRun(() -> {
             internalReadEntryComplete(entry, ctx);
         }));
     }
@@ -187,7 +198,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     @Override
     public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
         // Don't block caller thread, complete read entry fail with dispatcher 
dedicated thread.
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
+        dispatcherExecutor.execute(SafeRun.safeRun(() -> {
             internalReadEntryFailed(exception, ctx);
         }));
     }
@@ -246,7 +257,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     public boolean cancelReadRequests() {
         if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
             // Don't block caller thread, complete cancel read with dispatcher 
dedicated thread.
-            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
 SafeRun.safeRun(() -> {
+             topicExecutor.execute(SafeRun.safeRun(() -> {
                 synchronized (StreamingEntryReader.this) {
                     if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
                         internalCancelReadRequests();
@@ -271,8 +282,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     private void retryReadRequest(PendingReadEntryRequest 
pendingReadEntryRequest, long delay) {
         topic.getBrokerService().executor().schedule(() -> {
             // Jump again into dispatcher dedicated thread
-            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
-                    SafeRun.safeRun(() -> {
+            dispatcherExecutor.execute(SafeRun.safeRun(() -> {
                 ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
                 managedLedger.asyncReadEntry(pendingReadEntryRequest.position, 
this, pendingReadEntryRequest);
             }));
@@ -281,9 +291,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
 
     @Override
     public void entriesAvailable() {
-        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
 SafeRun.safeRun(() -> {
-            internalEntriesAvailable();
-        }));
+        
dispatcherExecutor.execute(SafeRun.safeRun(this::internalEntriesAvailable));
     }
 
     private synchronized void internalEntriesAvailable() {

Reply via email to