This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e26afd99c9f91887eb63d9d05463f8e41c9a2548
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Dec 21 15:17:21 2021 -0600

    [Java Client] Improve consumer listener logic (#13273)
    
    * [Java Client] Improve consumer listener logic
    
    * Move isListenerHandlingMessage update to before submitting to executor
    
    (cherry picked from commit 9f46c4af5e0dad7f9223add57842591822c3dea3)
---
 .../org/apache/pulsar/client/impl/ConsumerBase.java     | 17 +++++++++--------
 .../org/apache/pulsar/client/impl/ConsumerImpl.java     |  3 +--
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0f47208..20baf47 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -85,7 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
     protected final Lock reentrantLock = new ReentrantLock();
-    private final AtomicInteger executorQueueSize = new AtomicInteger(0);
+    private volatile boolean isListenerHandlingMessage = false;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider 
executorProvider,
@@ -915,15 +914,17 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
     }
 
     protected void triggerListener() {
-        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-        // thread while the message processing happens
+        // Use internalPinnedExecutor to maintain message ordering
         internalPinnedExecutor.execute(() -> {
             try {
-                // Control executor to call MessageListener one by one.
-                if (executorQueueSize.get() < 1) {
+                // Listener should only have one pending/running executable to 
process a message
+                // See https://github.com/apache/pulsar/issues/11008 for 
context.
+                if (!isListenerHandlingMessage) {
                     final Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
                     if (msg != null) {
-                        executorQueueSize.incrementAndGet();
+                        isListenerHandlingMessage = true;
+                        // Trigger the notification on the message listener in 
a separate thread to avoid blocking the
+                        // internal pinned executor thread while the message 
processing happens
                         if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
                             
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
                                     callMessageListener(msg));
@@ -956,7 +957,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             log.error("[{}][{}] Message listener error in processing message: 
{}", topic, subscription,
                     msg.getMessageId(), t);
         } finally {
-            executorQueueSize.decrementAndGet();
+            isListenerHandlingMessage = false;
             triggerListener();
         }
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ff0b826..ac04651 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1127,8 +1127,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             increaseAvailablePermits(cnx(), skippedMessages.get());
         }
 
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
     }
 
     void messageReceived(MessageIdData messageId, int redeliveryCount, 
List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {

Reply via email to