This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e26afd99c9f91887eb63d9d05463f8e41c9a2548 Author: Michael Marshall <[email protected]> AuthorDate: Tue Dec 21 15:17:21 2021 -0600 [Java Client] Improve consumer listener logic (#13273) * [Java Client] Improve consumer listener logic * Move isListenerHandlingMessage update to before submitting to executor (cherry picked from commit 9f46c4af5e0dad7f9223add57842591822c3dea3) --- .../org/apache/pulsar/client/impl/ConsumerBase.java | 17 +++++++++-------- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 3 +-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 0f47208..20baf47 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -85,7 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected volatile long incomingMessagesSize = 0; protected volatile Timeout batchReceiveTimeout = null; protected final Lock reentrantLock = new ReentrantLock(); - private final AtomicInteger executorQueueSize = new AtomicInteger(0); + private volatile boolean isListenerHandlingMessage = false; protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, int receiverQueueSize, ExecutorProvider executorProvider, @@ -915,15 +914,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } protected void triggerListener() { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens + // Use internalPinnedExecutor to maintain message ordering internalPinnedExecutor.execute(() -> { try { - // Control executor to call MessageListener one by one. - if (executorQueueSize.get() < 1) { + // Listener should only have one pending/running executable to process a message + // See https://github.com/apache/pulsar/issues/11008 for context. + if (!isListenerHandlingMessage) { final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); if (msg != null) { - executorQueueSize.incrementAndGet(); + isListenerHandlingMessage = true; + // Trigger the notification on the message listener in a separate thread to avoid blocking the + // internal pinned executor thread while the message processing happens if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> callMessageListener(msg)); @@ -956,7 +957,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, msg.getMessageId(), t); } finally { - executorQueueSize.decrementAndGet(); + isListenerHandlingMessage = false; triggerListener(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ff0b826..ac04651 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1127,8 +1127,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle increaseAvailablePermits(cnx(), skippedMessages.get()); } - internalPinnedExecutor.execute(() - -> tryTriggerListener()); + tryTriggerListener(); } void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
