This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4537fc8bab2f62a9fe26859e30ba292390e2f46c
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Nov 24 14:35:31 2025 +0200

    [fix][client] Skip processing messages in the listener when the consumer 
has been closed (#25006)
    
    (cherry picked from commit 43f75dfa86fdaa44adf58ba0d78f959279f7cd29)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 82 +++++++++++++++++++
 .../org/apache/pulsar/utils/TestLogAppender.java   | 92 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 26 +++++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  1 -
 4 files changed, 199 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index ea8eb6e8cc0..18e1accea9e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -35,15 +35,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.Level;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -54,6 +60,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.utils.TestLogAppender;
 import org.awaitility.Awaitility;
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
@@ -63,6 +70,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class MultiTopicsConsumerTest extends ProducerConsumerBase {
     private ScheduledExecutorService internalExecutorServiceDelegate;
@@ -432,4 +440,78 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
 
         pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
     }
+
+    @Test(timeOut = 30000)
+    public void testMessageListenerStopsProcessingAfterClosing() throws 
Exception {
+        int numMessages = 100;
+        String topic1 = newTopicName();
+        String topic2 = newTopicName();
+        final CountDownLatch consumerClosedLatch = new CountDownLatch(1);
+        final CountDownLatch messageProcessedLatch = new CountDownLatch(1);
+        AtomicInteger messageProcessedCount = new AtomicInteger(0);
+        AtomicInteger messagesQueuedForExecutor = new AtomicInteger(0);
+        AtomicInteger messagesCurrentlyInExecutor = new AtomicInteger(0);
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topics(List.of(topic1, topic2))
+                .subscriptionName("my-subscriber-name")
+                .messageListenerExecutor(new MessageListenerExecutor() {
+                    @Override
+                    public void execute(Message<?> message, Runnable runnable) 
{
+                        messagesQueuedForExecutor.incrementAndGet();
+                        messagesCurrentlyInExecutor.incrementAndGet();
+                        executor.execute(() -> {
+                            try {
+                                runnable.run();
+                            } finally {
+                                messagesCurrentlyInExecutor.decrementAndGet();
+                            }
+                        });
+                    }
+                })
+                .messageListener((c1, msg) -> {
+                    messageProcessedCount.incrementAndGet();
+                    c1.acknowledgeAsync(msg);
+                    messageProcessedLatch.countDown();
+                    try {
+                        consumerClosedLatch.await();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                }).subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic1)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        assertTrue(messageProcessedLatch.await(5, TimeUnit.SECONDS));
+        // wait until all messages have been queued in the listener
+        Awaitility.await().untilAsserted(() -> 
assertEquals(messagesQueuedForExecutor.get(), numMessages));
+        @Cleanup
+        TestLogAppender testLogAppender = 
TestLogAppender.create(Optional.empty());
+        consumer.close();
+        consumerClosedLatch.countDown();
+        // only a single message should be processed
+        assertEquals(messageProcessedCount.get(), 1);
+        // wait until all messages have been drained from the executor
+        Awaitility.await().untilAsserted(() -> 
assertEquals(messagesCurrentlyInExecutor.get(), 0));
+        testLogAppender.getEvents().forEach(logEvent -> {
+            if (logEvent.getLevel() == Level.ERROR) {
+                org.apache.logging.log4j.message.Message logEventMessage = 
logEvent.getMessage();
+                fail("No error should be logged when closing a consumer. Got: 
" + logEventMessage
+                        .getFormattedMessage() + " throwable:" + 
logEventMessage.getThrowable());
+            }
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
new file mode 100644
index 00000000000..cfb07913b53
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+/**
+ * Log4J appender that captures all log events for a specified logger.
+ */
+public class TestLogAppender extends AbstractAppender implements AutoCloseable 
{
+    private final List<LogEvent> events = Collections.synchronizedList(new 
ArrayList<>());
+    private static AtomicInteger idGenerator = new AtomicInteger(0);
+    private final LoggerConfig loggerConfig;
+    private final Runnable onConfigurationChange;
+
+    /**
+     * Create a new TestLogAppender. Use the {@link #close()} method to stop 
it and unregister it from Log4J.
+     * @param loggerName the logger name to register to. Pass Optional.empty() 
to register to the root logger.
+     * @return return the new TestLogAppender instance.
+     */
+    public static TestLogAppender create(Optional<String> loggerName) {
+        LoggerContext context = (LoggerContext) LogManager.getContext(false);
+        Configuration config = context.getConfiguration();
+        LoggerConfig loggerConfig = 
loggerName.map(config::getLoggerConfig).orElseGet(config::getRootLogger);
+        TestLogAppender testAppender = new TestLogAppender(loggerConfig, 
context::updateLoggers);
+        testAppender.start();
+        loggerConfig.addAppender(testAppender, Level.ALL, null);
+        context.updateLoggers();
+        return testAppender;
+    }
+
+    TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange) 
{
+        super("TestAppender" + idGenerator.incrementAndGet(), null, 
PatternLayout.createDefaultLayout(), false, null);
+        this.loggerConfig = loggerConfig;
+        this.onConfigurationChange = onConfigurationChange;
+    }
+
+    @Override
+    public void append(LogEvent event) {
+        events.add(event.toImmutable());
+    }
+
+    public List<LogEvent> getEvents() {
+        return new ArrayList<>(events);
+    }
+
+    public void clearEvents() {
+        events.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        stop(1, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected boolean stop(long timeout, TimeUnit timeUnit, boolean 
changeLifeCycleState) {
+        boolean stopped = super.stop(timeout, timeUnit, changeLifeCycleState);
+        loggerConfig.removeAppender(getName());
+        onConfigurationChange.run();
+        return stopped;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7cf6f079f71..ffe9b319338 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final MessageListenerExecutor messageListenerExecutor;
     protected final ExecutorService externalPinnedExecutor;
     protected final ExecutorService internalPinnedExecutor;
-    protected UnAckedMessageTracker unAckedMessageTracker;
+    protected final UnAckedMessageTracker unAckedMessageTracker;
     final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
@@ -1155,12 +1155,36 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void callMessageListener(Message<T> msg) {
         try {
+            State state = getState();
+            if (state == State.Closing || state == State.Closed) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Consumer has been closed. Skipping 
message {}.", topic, subscription,
+                            msg.getMessageId());
+                }
+                msg.release();
+                return;
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Calling message listener for message {}", 
topic, subscription,
                         msg.getMessageId());
             }
             ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
                     ? ((TopicMessageImpl<T>) msg).receivedByconsumer : 
(ConsumerImpl) this;
+
+            // check the internal consumer state
+            if (receivedConsumer != this) {
+                State receivedByConsumerState = receivedConsumer.getState();
+                if (receivedByConsumerState == State.Closing || 
receivedByConsumerState == State.Closed) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Consumer that received the message 
has been closed. Skipping message {}.",
+                                topic, subscription, msg.getMessageId());
+                    }
+                    msg.release();
+                    return;
+                }
+            }
+
             // Increase the permits here since we will not increase permits 
while receive messages from consumer
             // after enabled message listener.
             receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg 
instanceof TopicMessageImpl
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 359eb1d2c96..720b4d00451 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -677,7 +677,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private void cleanupMultiConsumer() {
         if (unAckedMessageTracker != null) {
             unAckedMessageTracker.close();
-            unAckedMessageTracker = null;
         }
         if (partitionsAutoUpdateTimeout != null) {
             partitionsAutoUpdateTimeout.cancel();

Reply via email to