This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4537fc8bab2f62a9fe26859e30ba292390e2f46c Author: Lari Hotari <[email protected]> AuthorDate: Mon Nov 24 14:35:31 2025 +0200 [fix][client] Skip processing messages in the listener when the consumer has been closed (#25006) (cherry picked from commit 43f75dfa86fdaa44adf58ba0d78f959279f7cd29) --- .../pulsar/client/api/MultiTopicsConsumerTest.java | 82 +++++++++++++++++++ .../org/apache/pulsar/utils/TestLogAppender.java | 92 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 26 +++++- .../client/impl/MultiTopicsConsumerImpl.java | 1 - 4 files changed, 199 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index ea8eb6e8cc0..18e1accea9e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -35,15 +35,21 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.Level; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; @@ -54,6 +60,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.utils.TestLogAppender; import org.awaitility.Awaitility; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; @@ -63,6 +70,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class MultiTopicsConsumerTest extends ProducerConsumerBase { private ScheduledExecutorService internalExecutorServiceDelegate; @@ -432,4 +440,78 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); } + + @Test(timeOut = 30000) + public void testMessageListenerStopsProcessingAfterClosing() throws Exception { + int numMessages = 100; + String topic1 = newTopicName(); + String topic2 = newTopicName(); + final CountDownLatch consumerClosedLatch = new CountDownLatch(1); + final CountDownLatch messageProcessedLatch = new CountDownLatch(1); + AtomicInteger messageProcessedCount = new AtomicInteger(0); + AtomicInteger messagesQueuedForExecutor = new AtomicInteger(0); + AtomicInteger messagesCurrentlyInExecutor = new AtomicInteger(0); + + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newSingleThreadExecutor(); + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topics(List.of(topic1, topic2)) + .subscriptionName("my-subscriber-name") + .messageListenerExecutor(new MessageListenerExecutor() { + @Override + public void execute(Message<?> message, Runnable runnable) { + messagesQueuedForExecutor.incrementAndGet(); + messagesCurrentlyInExecutor.incrementAndGet(); + executor.execute(() -> { + try { + runnable.run(); + } finally { + messagesCurrentlyInExecutor.decrementAndGet(); + } + }); + } + }) + .messageListener((c1, msg) -> { + messageProcessedCount.incrementAndGet(); + c1.acknowledgeAsync(msg); + messageProcessedLatch.countDown(); + try { + consumerClosedLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }).subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic1) + .enableBatching(false) + .create(); + + for (int i = 0; i < numMessages; i++) { + final String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + assertTrue(messageProcessedLatch.await(5, TimeUnit.SECONDS)); + // wait until all messages have been queued in the listener + Awaitility.await().untilAsserted(() -> assertEquals(messagesQueuedForExecutor.get(), numMessages)); + @Cleanup + TestLogAppender testLogAppender = TestLogAppender.create(Optional.empty()); + consumer.close(); + consumerClosedLatch.countDown(); + // only a single message should be processed + assertEquals(messageProcessedCount.get(), 1); + // wait until all messages have been drained from the executor + Awaitility.await().untilAsserted(() -> assertEquals(messagesCurrentlyInExecutor.get(), 0)); + testLogAppender.getEvents().forEach(logEvent -> { + if (logEvent.getLevel() == Level.ERROR) { + org.apache.logging.log4j.message.Message logEventMessage = logEvent.getMessage(); + fail("No error should be logged when closing a consumer. Got: " + logEventMessage + .getFormattedMessage() + " throwable:" + logEventMessage.getThrowable()); + } + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java new file mode 100644 index 00000000000..cfb07913b53 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.layout.PatternLayout; + +/** + * Log4J appender that captures all log events for a specified logger. + */ +public class TestLogAppender extends AbstractAppender implements AutoCloseable { + private final List<LogEvent> events = Collections.synchronizedList(new ArrayList<>()); + private static AtomicInteger idGenerator = new AtomicInteger(0); + private final LoggerConfig loggerConfig; + private final Runnable onConfigurationChange; + + /** + * Create a new TestLogAppender. Use the {@link #close()} method to stop it and unregister it from Log4J. + * @param loggerName the logger name to register to. Pass Optional.empty() to register to the root logger. + * @return return the new TestLogAppender instance. + */ + public static TestLogAppender create(Optional<String> loggerName) { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + Configuration config = context.getConfiguration(); + LoggerConfig loggerConfig = loggerName.map(config::getLoggerConfig).orElseGet(config::getRootLogger); + TestLogAppender testAppender = new TestLogAppender(loggerConfig, context::updateLoggers); + testAppender.start(); + loggerConfig.addAppender(testAppender, Level.ALL, null); + context.updateLoggers(); + return testAppender; + } + + TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange) { + super("TestAppender" + idGenerator.incrementAndGet(), null, PatternLayout.createDefaultLayout(), false, null); + this.loggerConfig = loggerConfig; + this.onConfigurationChange = onConfigurationChange; + } + + @Override + public void append(LogEvent event) { + events.add(event.toImmutable()); + } + + public List<LogEvent> getEvents() { + return new ArrayList<>(events); + } + + public void clearEvents() { + events.clear(); + } + + @Override + public void close() throws Exception { + stop(1, TimeUnit.SECONDS); + } + + @Override + protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) { + boolean stopped = super.stop(timeout, timeUnit, changeLifeCycleState); + loggerConfig.removeAppender(getName()); + onConfigurationChange.run(); + return stopped; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 7cf6f079f71..ffe9b319338 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected final MessageListenerExecutor messageListenerExecutor; protected final ExecutorService externalPinnedExecutor; protected final ExecutorService internalPinnedExecutor; - protected UnAckedMessageTracker unAckedMessageTracker; + protected final UnAckedMessageTracker unAckedMessageTracker; final GrowableArrayBlockingQueue<Message<T>> incomingMessages; protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap; protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives; @@ -1155,12 +1155,36 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected void callMessageListener(Message<T> msg) { try { + State state = getState(); + if (state == State.Closing || state == State.Closed) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Consumer has been closed. Skipping message {}.", topic, subscription, + msg.getMessageId()); + } + msg.release(); + return; + } + if (log.isDebugEnabled()) { log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg.getMessageId()); } ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl<T>) msg).receivedByconsumer : (ConsumerImpl) this; + + // check the internal consumer state + if (receivedConsumer != this) { + State receivedByConsumerState = receivedConsumer.getState(); + if (receivedByConsumerState == State.Closing || receivedByConsumerState == State.Closed) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Consumer that received the message has been closed. Skipping message {}.", + topic, subscription, msg.getMessageId()); + } + msg.release(); + return; + } + } + // Increase the permits here since we will not increase permits while receive messages from consumer // after enabled message listener. receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 359eb1d2c96..720b4d00451 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -677,7 +677,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private void cleanupMultiConsumer() { if (unAckedMessageTracker != null) { unAckedMessageTracker.close(); - unAckedMessageTracker = null; } if (partitionsAutoUpdateTimeout != null) { partitionsAutoUpdateTimeout.cancel();
