This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43f75dfa86f [fix][client] Skip processing messages in the listener
when the consumer has been closed (#25006)
43f75dfa86f is described below
commit 43f75dfa86fdaa44adf58ba0d78f959279f7cd29
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Nov 24 14:35:31 2025 +0200
[fix][client] Skip processing messages in the listener when the consumer
has been closed (#25006)
---
.../pulsar/client/api/MultiTopicsConsumerTest.java | 82 +++++++++++++++++++
.../org/apache/pulsar/utils/TestLogAppender.java | 92 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 26 +++++-
.../client/impl/MultiTopicsConsumerImpl.java | 1 -
4 files changed, 199 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index ec33d204f25..9d984411dc9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -34,15 +34,21 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.Level;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -53,6 +59,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.utils.TestLogAppender;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
@@ -62,6 +69,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker")
public class MultiTopicsConsumerTest extends ProducerConsumerBase {
private ScheduledExecutorService internalExecutorServiceDelegate;
@@ -431,4 +439,78 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}
+
+ @Test(timeOut = 30000)
+ public void testMessageListenerStopsProcessingAfterClosing() throws
Exception {
+ int numMessages = 100;
+ String topic1 = newTopicName();
+ String topic2 = newTopicName();
+ final CountDownLatch consumerClosedLatch = new CountDownLatch(1);
+ final CountDownLatch messageProcessedLatch = new CountDownLatch(1);
+ AtomicInteger messageProcessedCount = new AtomicInteger(0);
+ AtomicInteger messagesQueuedForExecutor = new AtomicInteger(0);
+ AtomicInteger messagesCurrentlyInExecutor = new AtomicInteger(0);
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topics(List.of(topic1, topic2))
+ .subscriptionName("my-subscriber-name")
+ .messageListenerExecutor(new MessageListenerExecutor() {
+ @Override
+ public void execute(Message<?> message, Runnable runnable)
{
+ messagesQueuedForExecutor.incrementAndGet();
+ messagesCurrentlyInExecutor.incrementAndGet();
+ executor.execute(() -> {
+ try {
+ runnable.run();
+ } finally {
+ messagesCurrentlyInExecutor.decrementAndGet();
+ }
+ });
+ }
+ })
+ .messageListener((c1, msg) -> {
+ messageProcessedCount.incrementAndGet();
+ c1.acknowledgeAsync(msg);
+ messageProcessedLatch.countDown();
+ try {
+ consumerClosedLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }).subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic1)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ assertTrue(messageProcessedLatch.await(5, TimeUnit.SECONDS));
+ // wait until all messages have been queued in the listener
+ Awaitility.await().untilAsserted(() ->
assertEquals(messagesQueuedForExecutor.get(), numMessages));
+ @Cleanup
+ TestLogAppender testLogAppender =
TestLogAppender.create(Optional.empty());
+ consumer.close();
+ consumerClosedLatch.countDown();
+ // only a single message should be processed
+ assertEquals(messageProcessedCount.get(), 1);
+ // wait until all messages have been drained from the executor
+ Awaitility.await().untilAsserted(() ->
assertEquals(messagesCurrentlyInExecutor.get(), 0));
+ testLogAppender.getEvents().forEach(logEvent -> {
+ if (logEvent.getLevel() == Level.ERROR) {
+ org.apache.logging.log4j.message.Message logEventMessage =
logEvent.getMessage();
+ fail("No error should be logged when closing a consumer. Got:
" + logEventMessage
+ .getFormattedMessage() + " throwable:" +
logEventMessage.getThrowable());
+ }
+ });
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
new file mode 100644
index 00000000000..cfb07913b53
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+/**
+ * Log4J appender that captures all log events for a specified logger.
+ */
+public class TestLogAppender extends AbstractAppender implements AutoCloseable
{
+ private final List<LogEvent> events = Collections.synchronizedList(new
ArrayList<>());
+ private static AtomicInteger idGenerator = new AtomicInteger(0);
+ private final LoggerConfig loggerConfig;
+ private final Runnable onConfigurationChange;
+
+ /**
+ * Create a new TestLogAppender. Use the {@link #close()} method to stop
it and unregister it from Log4J.
+ * @param loggerName the logger name to register to. Pass Optional.empty()
to register to the root logger.
+ * @return return the new TestLogAppender instance.
+ */
+ public static TestLogAppender create(Optional<String> loggerName) {
+ LoggerContext context = (LoggerContext) LogManager.getContext(false);
+ Configuration config = context.getConfiguration();
+ LoggerConfig loggerConfig =
loggerName.map(config::getLoggerConfig).orElseGet(config::getRootLogger);
+ TestLogAppender testAppender = new TestLogAppender(loggerConfig,
context::updateLoggers);
+ testAppender.start();
+ loggerConfig.addAppender(testAppender, Level.ALL, null);
+ context.updateLoggers();
+ return testAppender;
+ }
+
+ TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange)
{
+ super("TestAppender" + idGenerator.incrementAndGet(), null,
PatternLayout.createDefaultLayout(), false, null);
+ this.loggerConfig = loggerConfig;
+ this.onConfigurationChange = onConfigurationChange;
+ }
+
+ @Override
+ public void append(LogEvent event) {
+ events.add(event.toImmutable());
+ }
+
+ public List<LogEvent> getEvents() {
+ return new ArrayList<>(events);
+ }
+
+ public void clearEvents() {
+ events.clear();
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop(1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected boolean stop(long timeout, TimeUnit timeUnit, boolean
changeLifeCycleState) {
+ boolean stopped = super.stop(timeout, timeUnit, changeLifeCycleState);
+ loggerConfig.removeAppender(getName());
+ onConfigurationChange.run();
+ return stopped;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ec95f2c2f65..67306e16c09 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -90,7 +90,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final MessageListenerExecutor messageListenerExecutor;
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
- protected UnAckedMessageTracker unAckedMessageTracker;
+ protected final UnAckedMessageTracker unAckedMessageTracker;
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected Map<MessageIdAdv, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
@@ -1176,12 +1176,36 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected void callMessageListener(Message<T> msg) {
try {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Consumer has been closed. Skipping
message {}.", topic, subscription,
+ msg.getMessageId());
+ }
+ msg.release();
+ return;
+ }
+
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Calling message listener for message {}",
topic, subscription,
msg.getMessageId());
}
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
? ((TopicMessageImpl<T>) msg).receivedByconsumer :
(ConsumerImpl) this;
+
+ // check the internal consumer state
+ if (receivedConsumer != this) {
+ State receivedByConsumerState = receivedConsumer.getState();
+ if (receivedByConsumerState == State.Closing ||
receivedByConsumerState == State.Closed) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Consumer that received the message
has been closed. Skipping message {}.",
+ topic, subscription, msg.getMessageId());
+ }
+ msg.release();
+ return;
+ }
+ }
+
// Increase the permits here since we will not increase permits
while receive messages from consumer
// after enabled message listener.
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg
instanceof TopicMessageImpl
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 998dc52951a..9e8e12818c6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -683,7 +683,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private void cleanupMultiConsumer() {
if (unAckedMessageTracker != null) {
unAckedMessageTracker.close();
- unAckedMessageTracker = null;
}
if (partitionsAutoUpdateTimeout != null) {
partitionsAutoUpdateTimeout.cancel();