This is an automated email from the ASF dual-hosted git repository.

wave pushed a commit to branch dave2wave-fix-performance-of-client
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d65f5617e9daa4dc0764f26246fa160d83e61b2
Author: Dave Fisher <[email protected]>
AuthorDate: Thu Mar 31 13:51:33 2022 -0700

    Revert "Revert "Fix Regression in Consumer Performance""
    
    This reverts commit 907b8a7c55cddbe4557931190bac328e220ea44d.
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 48 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 25 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dd21666..b0b4879 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,35 +908,33 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void triggerListener() {
         // Use internalPinnedExecutor to maintain message ordering
-        internalPinnedExecutor.execute(() -> {
-            try {
-                // Listener should only have one pending/running executable to 
process a message
-                // See https://github.com/apache/pulsar/issues/11008 for 
context.
-                if (!isListenerHandlingMessage) {
-                    final Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
-                    if (msg != null) {
-                        isListenerHandlingMessage = true;
-                        // Trigger the notification on the message listener in 
a separate thread to avoid blocking the
-                        // internal pinned executor thread while the message 
processing happens
-                        if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
-                            
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                    callMessageListener(msg));
-                        } else {
-                            getExternalExecutor(msg).execute(() -> {
-                                callMessageListener(msg);
-                            });
-                        }
+        try {
+            // Listener should only have one pending/running executable to 
process a message
+            // See https://github.com/apache/pulsar/issues/11008 for context.
+            if (!isListenerHandlingMessage) {
+                final Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    isListenerHandlingMessage = true;
+                    // Trigger the notification on the message listener in a 
separate thread to avoid blocking the
+                    // internal pinned executor thread while the message 
processing happens
+                    if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
+                        
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                            callMessageListener(msg));
+                    } else {
+                        getExternalExecutor(msg).execute(() -> {
+                            callMessageListener(msg);
+                        });
                     }
                 }
-            } catch (PulsarClientException e) {
-                log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                return;
             }
+        } catch (PulsarClientException e) {
+            log.warn("[{}] [{}] Failed to dequeue the message for listener", 
topic, subscription, e);
+            return;
+        }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
-            }
-        });
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
+        }
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ebf3583..cd111f0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,7 +1085,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        tryTriggerListener();
+        internalPinnedExecutor.execute(()
+                -> tryTriggerListener());
 
     }
 

Reply via email to