This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new d9493455529 [fix][client] Fix NPE of MultiTopicsConsumerImpl due to 
race condition (#18287)
d9493455529 is described below

commit d94934555297535c5e2f0f2f045c73d28ed99933
Author: Penghui Li <peng...@apache.org>
AuthorDate: Wed Nov 2 18:19:10 2022 +0800

    [fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition 
(#18287)
---
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6f9f51ff253..741716919e4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -256,6 +256,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
             }
+            // Stop to process the remaining message after the consumer is 
closed.
+            if (getState() == State.Closed) {
+                return;
+            }
             // Process the message, add to the queue and trigger listener or 
async callback
             messages.forEach(msg -> messageReceived(consumer, msg));
 
@@ -602,14 +606,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
-            .thenCompose((r) -> {
+            .thenComposeAsync((r) -> {
                 setState(State.Closed);
                 cleanupMultiConsumer();
                 log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
                         topic, subscription, consumerName);
                 // fail all pending-receive futures to notify application
                 return failPendingReceive();
-            })
+            }, internalPinnedExecutor)
             .whenComplete((r, ex) -> {
                 if (ex == null) {
                     unsubscribeFuture.complete(null);
@@ -644,13 +648,13 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
-            .thenCompose((r) -> {
+            .thenComposeAsync((r) -> {
                 setState(State.Closed);
                 cleanupMultiConsumer();
                 log.info("[{}] [{}] Closed Topics Consumer", topic, 
subscription);
                 // fail all pending-receive futures to notify application
                 return failPendingReceive();
-            })
+            }, internalPinnedExecutor)
             .whenComplete((r, ex) -> {
                 if (ex == null) {
                     closeFuture.complete(null);

Reply via email to