This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 63e452b6f7b [improve][broker] Pin executor to dispatcher and
StreamingEntryReader to cache the string hashing (#18566)
63e452b6f7b is described below
commit 63e452b6f7be954883625426623af7d19e7d8d89
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Nov 28 10:07:12 2022 +0800
[improve][broker] Pin executor to dispatcher and StreamingEntryReader to
cache the string hashing (#18566)
---
.../PersistentDispatcherSingleActiveConsumer.java | 18 ++++++++------
...istentStreamingDispatcherMultipleConsumers.java | 5 +++-
...entStreamingDispatcherSingleActiveConsumer.java | 4 ++--
.../streamingdispatch/StreamingEntryReader.java | 28 ++++++++++++++--------
4 files changed, 35 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0421076ddf3..97cb5c36809 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +62,8 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
private final AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
protected final PersistentTopic topic;
+ protected final Executor topicExecutor;
+ protected final Executor dispatcherExecutor;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
@@ -77,8 +80,10 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration(), cursor);
this.topic = topic;
+ this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
this.name = topic.getName() + " / " + (cursor.getName() != null ?
Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
+ this.dispatcherExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(name);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.readFailureBackoff = new
Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
@@ -145,7 +150,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
internalReadEntriesComplete(entries, obj);
}));
}
@@ -225,8 +230,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the
previous batch has been written to the socket.
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
- SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
readMoreEntries(newConsumer);
@@ -238,7 +242,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
internalConsumerFlow(consumer);
}));
}
@@ -269,7 +273,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
}));
}
@@ -463,7 +467,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
internalReadEntriesFailed(exception, ctx);
}));
}
@@ -513,7 +517,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer
and there is no pending read
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index dbb58e0540a..5df1fc2c6db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
@@ -47,10 +48,12 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
private int sendingTaskCounter = 0;
private final StreamingEntryReader streamingEntryReader = new
StreamingEntryReader((ManagedCursorImpl) cursor,
this, topic);
+ private final Executor topicExecutor;
public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic
topic, ManagedCursor cursor,
Subscription
subscription) {
super(topic, cursor, subscription);
+ this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
}
/**
@@ -126,7 +129,7 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
public void canReadMoreEntries(boolean withBackoff) {
havePendingRead = false;
topic.getBrokerService().executor().schedule(() -> {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
synchronized
(PersistentStreamingDispatcherMultipleConsumers.this) {
if (!havePendingRead) {
log.info("[{}] Scheduling read operation", name);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 6a2ea7564d5..5658771140e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -64,7 +64,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
public void canReadMoreEntries(boolean withBackoff) {
havePendingRead = false;
topic.getBrokerService().executor().schedule(() -> {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
synchronized
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && !havePendingRead) {
@@ -111,7 +111,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
*/
@Override
public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name,
safeRun(() -> {
+ dispatcherExecutor.execute(safeRun(() -> {
internalReadEntryComplete(entry, ctx);
}));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index ca23eb4c482..0f75538ee88 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -44,7 +44,6 @@ import org.apache.pulsar.client.impl.Backoff;
* Entry reader that fulfill read request by streamline the read instead of
reading with micro batch.
*/
@Slf4j
-@RequiredArgsConstructor
public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback,
WaitingEntryCallBack {
private final int maxRetry = 3;
@@ -61,6 +60,10 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
private final PersistentTopic topic;
+ private final Executor topicExecutor;
+
+ private final Executor dispatcherExecutor;
+
private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
private volatile State state;
@@ -73,6 +76,14 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
private final Backoff readFailureBackoff = new Backoff(10,
TimeUnit.MILLISECONDS,
1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
+ public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher
dispatcher, PersistentTopic topic) {
+ this.cursor = cursor;
+ this.dispatcher = dispatcher;
+ this.topic = topic;
+ this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
+ this.dispatcherExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(dispatcher.getName());
+ }
+
/**
* Read entries in streaming way, that said instead of reading with micro
batch and send entries to consumer after
* all entries in the batch are read from ledger, this method will fire
numEntriesToRead requests to managedLedger
@@ -146,7 +157,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
@Override
public void readEntryComplete(Entry entry, Object ctx) {
// Don't block caller thread, complete read entry with dispatcher
dedicated thread.
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
+ dispatcherExecutor.execute(SafeRun.safeRun(() -> {
internalReadEntryComplete(entry, ctx);
}));
}
@@ -187,7 +198,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
// Don't block caller thread, complete read entry fail with dispatcher
dedicated thread.
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
+ dispatcherExecutor.execute(SafeRun.safeRun(() -> {
internalReadEntryFailed(exception, ctx);
}));
}
@@ -246,7 +257,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
public boolean cancelReadRequests() {
if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
// Don't block caller thread, complete cancel read with dispatcher
dedicated thread.
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(),
SafeRun.safeRun(() -> {
+ topicExecutor.execute(SafeRun.safeRun(() -> {
synchronized (StreamingEntryReader.this) {
if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
internalCancelReadRequests();
@@ -271,8 +282,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
private void retryReadRequest(PendingReadEntryRequest
pendingReadEntryRequest, long delay) {
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
- SafeRun.safeRun(() -> {
+ dispatcherExecutor.execute(SafeRun.safeRun(() -> {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
managedLedger.asyncReadEntry(pendingReadEntryRequest.position,
this, pendingReadEntryRequest);
}));
@@ -281,9 +291,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
@Override
public void entriesAvailable() {
-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
- internalEntriesAvailable();
- }));
+
dispatcherExecutor.execute(SafeRun.safeRun(this::internalEntriesAvailable));
}
private synchronized void internalEntriesAvailable() {