This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0aad9a12339 [fix][java-client] Fix performance regression with message
listener (#15162)
0aad9a12339 is described below
commit 0aad9a12339de434ce149cd57d524a3b45b1acc8
Author: lipenghui <[email protected]>
AuthorDate: Tue Apr 19 22:18:52 2022 +0800
[fix][java-client] Fix performance regression with message listener (#15162)
https://github.com/apache/pulsar/pull/13023 has introduced a performance
regression.
For each message, we are switching from external thread pool -> internal
thread poll -> external thread pool.
Previously we want to control the outstanding messages of a consumer using
a listener, so after #11455,
the message will not move from the receiver queue to the external executor.
And #13023 changed the listener trigger
in the internal thread pool to fix the ordering issue, so this is the root
cause of the performance regression.
Here is the frame graph to show the thread frame of the internal thread and
external thread.
[framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt)
And also fix the performance issue for multiple topic consumers and
key-shared subscriptions which enabled message listeners. Before this change,
the messages are processed serially. After this change, We can improve
parallelism on the premise of ensuring order.
- Remove the isListenerHandlingMessage control
- Move the messages from the receiver queue to the queue of external
executor but not increase permits
- Increase permits before call message listener
(cherry picked from commit 83cd7911ec43085be223b18e9d3d58a4638fd36f)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 1 +
.../apache/pulsar/client/impl/ConsumerBase.java | 37 ++++++++++++----------
.../apache/pulsar/client/impl/ConsumerImpl.java | 25 ++++++++++++---
.../client/impl/MultiTopicsConsumerImpl.java | 6 ++--
.../org/apache/pulsar/client/impl/ReaderImpl.java | 2 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +-
6 files changed, 46 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 6b032370898..7c75f099773 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -116,6 +116,7 @@ public class RawReaderImpl implements RawReader {
client.externalExecutorProvider(),
TopicName.getPartitionIndex(conf.getSingleTopic()),
false,
+ false,
consumerFuture,
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index b50a5700093..062b277165b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -84,7 +84,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
- private volatile boolean isListenerHandlingMessage = false;
protected ConsumerBase(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider
executorProvider,
@@ -920,33 +919,34 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
}
private void triggerListener() {
+ // The messages are added into the receiver queue by the internal
pinned executor,
+ // so need to use internal pinned executor to avoid race condition
which message
+ // might be added into the receiver queue but not able to read here.
internalPinnedExecutor.execute(() -> {
try {
- // Listener should only have one pending/running executable to
process a message
- // See https://github.com/apache/pulsar/issues/11008 for
context.
- if (!isListenerHandlingMessage) {
- final Message<T> msg = internalReceive(0,
TimeUnit.MILLISECONDS);
+ Message<T> msg;
+ do {
+ msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
- isListenerHandlingMessage = true;
// Trigger the notification on the message listener in
a separate thread to avoid blocking the
// internal pinned executor thread while the message
processing happens
+ final Message<T> finalMsg = msg;
if (SubscriptionType.Key_Shared ==
conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
- callMessageListener(msg));
+ callMessageListener(finalMsg));
} else {
getExternalExecutor(msg).execute(() -> {
- callMessageListener(msg);
+ callMessageListener(finalMsg);
});
}
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Message has been cleared from
the queue", topic, subscription);
+ }
}
- }
+ } while (msg != null);
} catch (PulsarClientException e) {
log.warn("[{}] [{}] Failed to dequeue the message for
listener", topic, subscription, e);
- return;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Message has been cleared from the queue",
topic, subscription);
}
});
}
@@ -957,13 +957,16 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
log.debug("[{}][{}] Calling message listener for message {}",
topic, subscription,
msg.getMessageId());
}
+ ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
+ ? ((TopicMessageImpl<T>) msg).receivedByconsumer :
(ConsumerImpl) this;
+ // Increase the permits here since we will not increase permits
while receive messages from consumer
+ // after enabled message listener.
+ receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg
instanceof TopicMessageImpl
+ ? ((TopicMessageImpl<T>) msg).getMessage() :
msg));
listener.received(ConsumerBase.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message:
{}", topic, subscription,
msg.getMessageId(), t);
- } finally {
- isListenerHandlingMessage = false;
- triggerListener();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 21a3151d04a..85f7e05c460 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -137,6 +137,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final int partitionIndex;
private final boolean hasParentConsumer;
+ private final boolean parentConsumerHasListener;
private final int receiverQueueRefillThreshold;
@@ -210,8 +211,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Schema<T> schema,
ConsumerInterceptors<T>
interceptors,
boolean
createTopicIfDoesNotExist) {
- return newConsumerImpl(client, topic, conf, executorProvider,
partitionIndex, hasParentConsumer, subscribeFuture,
- startMessageId, schema, interceptors,
createTopicIfDoesNotExist, 0);
+ return newConsumerImpl(client, topic, conf, executorProvider,
partitionIndex, hasParentConsumer, false,
+ subscribeFuture, startMessageId, schema, interceptors,
createTopicIfDoesNotExist, 0);
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
@@ -220,6 +221,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ExecutorProvider
executorProvider,
int partitionIndex,
boolean hasParentConsumer,
+ boolean
parentConsumerHasListener,
CompletableFuture<Consumer<T>>
subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
@@ -233,14 +235,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, executorProvider,
partitionIndex, hasParentConsumer,
- subscribeFuture, startMessageId,
startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
+ parentConsumerHasListener,
+ subscribeFuture, startMessageId,
+ startMessageRollbackDurationInSec /* rollback time in sec
to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean
hasParentConsumer,
- CompletableFuture<Consumer<T>> subscribeFuture, MessageId
startMessageId,
+ boolean parentConsumerHasListener, CompletableFuture<Consumer<T>>
subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(),
executorProvider, subscribeFuture, schema, interceptors);
@@ -254,6 +258,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
+ this.parentConsumerHasListener = parentConsumerHasListener;
this.priorityLevel = conf.getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition =
conf.getSubscriptionInitialPosition();
@@ -1450,7 +1455,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was
cleared after reconnection.
} else {
- increaseAvailablePermits(currentCnx);
+ if (listener == null && !parentConsumerHasListener) {
+ increaseAvailablePermits(currentCnx);
+ }
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
@@ -1481,6 +1488,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ void increaseAvailablePermits(MessageImpl<?> msg) {
+ ClientCnx currentCnx = cnx();
+ ClientCnx msgCnx = msg.getCnx();
+ if (msgCnx == currentCnx) {
+ increaseAvailablePermits(currentCnx);
+ }
+ }
+
void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 1d479976950..6bfbb71c72c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -975,7 +975,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData,
client.externalExecutorProvider(),
- partitionIndex, true, subFuture,
+ partitionIndex, true, listener != null,
subFuture,
startMessageId, schema, interceptors,
createIfDoesNotExist,
startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
@@ -1002,7 +1002,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
} else {
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider(), -1,
- true, subFuture, startMessageId, schema,
interceptors,
+ true, listener != null, subFuture, startMessageId,
schema, interceptors,
createIfDoesNotExist,
startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
@@ -1298,7 +1298,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider(),
- partitionIndex, true, subFuture,
startMessageId, schema, interceptors,
+ partitionIndex, true, listener != null,
subFuture, startMessageId, schema, interceptors,
true /* createTopicIfDoesNotExist */,
startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
if (paused) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 37d346ef932..72c74e8875d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -116,7 +116,7 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx =
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client,
readerConfiguration.getTopicName(), consumerConfiguration,
- executorProvider, partitionIdx, false, consumerFuture,
+ executorProvider, partitionIdx, false, false, consumerFuture,
readerConfiguration.getStartMessageId(),
readerConfiguration.getStartMessageFromRollbackDurationInSec(),
schema, null, true /* createTopicIfDoesNotExist */);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 5324c33d011..6375e37a89c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -54,7 +54,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T>
{
CompletableFuture<Consumer<T>> subscribeFuture, MessageId
startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
- super(client, topic, conf, executorProvider, partitionIndex,
hasParentConsumer, subscribeFuture,
+ super(client, topic, conf, executorProvider, partitionIndex,
hasParentConsumer, false, subscribeFuture,
startMessageId, 0 /* startMessageRollbackDurationInSec */,
schema, interceptors,
createTopicIfDoesNotExist);
}