This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 51c2bb4dfbd [improve][broker] Choose random thread for consumerFlow in
PersistentDispatcherSingleActiveConsumer (#20522)
51c2bb4dfbd is described below
commit 51c2bb4dfbd088810a76963ba86afeec2b193776
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Jun 12 18:42:49 2023 +0800
[improve][broker] Choose random thread for consumerFlow in
PersistentDispatcherSingleActiveConsumer (#20522)
### Motivation
Currently, all subscriptions of one topic will do `consuemrFlow` action in
a single thread, which is chosen by topicName:
```
this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
```
If there is a large number of subscriptions in a topic, all the work will
focus on one thread ---- the chosen thread, which will reduce the consume
performance. So this this patch , I'd like to choose a ramdom thread for
`consumerFlow` in `PersistentDispatcherSingleActiveConsumer` to improve the
consume performance.
### Modifications
*
`topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);`
-> `topic.getBrokerService().getTopicOrderedExecutor().chooseThread();`
* `this.topicExecutor ` -> `this.executor`
---
.../PersistentDispatcherSingleActiveConsumer.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7cbf7bd2c78..d9d0f6adc87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -61,7 +61,7 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
private final AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
protected final PersistentTopic topic;
- protected final Executor topicExecutor;
+ protected final Executor executor;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
@@ -79,7 +79,7 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration(), cursor);
this.topic = topic;
- this.topicExecutor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
+ this.executor =
topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
this.name = topic.getName() + " / " + (cursor.getName() != null ?
Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
@@ -148,7 +148,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
- topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
+ executor.execute(() -> internalReadEntriesComplete(entries, obj));
}
public synchronized void internalReadEntriesComplete(final List<Entry>
entries, Object obj) {
@@ -226,7 +226,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the
previous batch has been written to the socket.
- topicExecutor.execute(() -> {
+ executor.execute(() -> {
synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
readMoreEntries(newConsumer);
@@ -238,7 +238,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
- topicExecutor.execute(() -> internalConsumerFlow(consumer));
+ executor.execute(() -> internalConsumerFlow(consumer));
}
private synchronized void internalConsumerFlow(Consumer consumer) {
@@ -267,7 +267,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
- topicExecutor.execute(() ->
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
+ executor.execute(() ->
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
@@ -459,7 +459,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
- topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
+ executor.execute(() -> internalReadEntriesFailed(exception, ctx));
}
private synchronized void internalReadEntriesFailed(ManagedLedgerException
exception, Object ctx) {
@@ -507,7 +507,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
- topicExecutor.execute(() -> {
+ executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer
and there is no pending read