This is an automated email from the ASF dual-hosted git repository. wave pushed a commit to branch dave2wave-fix-performance-of-client in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5d65f5617e9daa4dc0764f26246fa160d83e61b2 Author: Dave Fisher <[email protected]> AuthorDate: Thu Mar 31 13:51:33 2022 -0700 Revert "Revert "Fix Regression in Consumer Performance"" This reverts commit 907b8a7c55cddbe4557931190bac328e220ea44d. --- .../apache/pulsar/client/impl/ConsumerBase.java | 48 +++++++++++----------- .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dd21666..b0b4879 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -908,35 +908,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected void triggerListener() { // Use internalPinnedExecutor to maintain message ordering - internalPinnedExecutor.execute(() -> { - try { - // Listener should only have one pending/running executable to process a message - // See https://github.com/apache/pulsar/issues/11008 for context. - if (!isListenerHandlingMessage) { - final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); - if (msg != null) { - isListenerHandlingMessage = true; - // Trigger the notification on the message listener in a separate thread to avoid blocking the - // internal pinned executor thread while the message processing happens - if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { - executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> - callMessageListener(msg)); - } else { - getExternalExecutor(msg).execute(() -> { - callMessageListener(msg); - }); - } + try { + // Listener should only have one pending/running executable to process a message + // See https://github.com/apache/pulsar/issues/11008 for context. + if (!isListenerHandlingMessage) { + final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); + if (msg != null) { + isListenerHandlingMessage = true; + // Trigger the notification on the message listener in a separate thread to avoid blocking the + // internal pinned executor thread while the message processing happens + if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { + executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> + callMessageListener(msg)); + } else { + getExternalExecutor(msg).execute(() -> { + callMessageListener(msg); + }); } } - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; } + } catch (PulsarClientException e) { + log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); + return; + } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); - } - }); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); + } } protected void callMessageListener(Message<T> msg) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ebf3583..cd111f0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1085,7 +1085,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle uncompressedPayload.release(); } - tryTriggerListener(); + internalPinnedExecutor.execute(() + -> tryTriggerListener()); }
