This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new d9493455529 [fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition (#18287) d9493455529 is described below commit d94934555297535c5e2f0f2f045c73d28ed99933 Author: Penghui Li <peng...@apache.org> AuthorDate: Wed Nov 2 18:19:10 2022 +0800 [fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition (#18287) --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6f9f51ff253..741716919e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -256,6 +256,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic()); } + // Stop to process the remaining message after the consumer is closed. + if (getState() == State.Closed) { + return; + } // Process the message, add to the queue and trigger listener or async callback messages.forEach(msg -> messageReceived(consumer, msg)); @@ -602,14 +606,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) - .thenCompose((r) -> { + .thenComposeAsync((r) -> { setState(State.Closed); cleanupMultiConsumer(); log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer", topic, subscription, consumerName); // fail all pending-receive futures to notify application return failPendingReceive(); - }) + }, internalPinnedExecutor) .whenComplete((r, ex) -> { if (ex == null) { unsubscribeFuture.complete(null); @@ -644,13 +648,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { .map(ConsumerImpl::closeAsync).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) - .thenCompose((r) -> { + .thenComposeAsync((r) -> { setState(State.Closed); cleanupMultiConsumer(); log.info("[{}] [{}] Closed Topics Consumer", topic, subscription); // fail all pending-receive futures to notify application return failPendingReceive(); - }) + }, internalPinnedExecutor) .whenComplete((r, ex) -> { if (ex == null) { closeFuture.complete(null);