This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 096b725edc9824e5599fd4b0f6bf99fa6d5e3403
Author: Kai Wang <[email protected]>
AuthorDate: Thu Jan 13 00:46:45 2022 +0800

    Fix the wrong multi-topic has message available behavior (#13634)
    
    Fixes #13605
    
    ### Motivation
    
    Currently, the multiTopicReader `hasMessageAvailable` might get the wrong 
result, we must check `numMessagesInQueue() > 0` again after finish all 
consumer `hasMessageAvaliableAsync` future, bacause some message might already 
in `MultiTopicsConsumerImpl#incomingMessages`.
    
    ### Modifications
    
    * Fix the wrong multi-topic has message available behavior.
    * Use `reader.readNextAsync()` instead of block method `reader.readNext()`.
    * Reduce the units test running time by changing `MultiTopicsReaderTest` to 
use `@BeforeClass`, `@AfterClass`.
    
    (cherry picked from commit e57dc8faa2979b6d03410fa2cef5f1dd84e46785)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 26 +++++++++++-----------
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 2 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index f6230e2..6b6bf95 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,17 +61,17 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
 @Slf4j
+@Test(groups = "flaky")
 public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "reader-multi-topics-sub";
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
@@ -87,7 +87,7 @@ public class MultiTopicsReaderTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("my-property/my-ns", policies);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -169,15 +169,15 @@ public class MultiTopicsReaderTest extends 
MockedPulsarServiceBaseTest {
     private static <T> void readMessageUseAsync(Reader<T> reader, 
List<Message<T>> msgs, CountDownLatch latch) {
         reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
             if (hasMessageAvailable) {
-                try {
-                    Message<T> msg = reader.readNext();
+                reader.readNextAsync().whenComplete((msg, ex) -> {
+                    if (ex != null) {
+                        log.error("Read message failed.", ex);
+                        latch.countDown();
+                        return;
+                    }
                     msgs.add(msg);
-                } catch (PulsarClientException e) {
-                    log.error("Read message failed.", e);
-                    latch.countDown();
-                    return;
-                }
-                readMessageUseAsync(reader, msgs, latch);
+                    readMessageUseAsync(reader, msgs, latch);
+                });
             } else {
                 latch.countDown();
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 177cc9f..fd98bb6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -771,7 +771,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             if (exception != null) {
                 completableFuture.completeExceptionally(exception);
             } else {
-                completableFuture.complete(hasMessageAvailable.get());
+                completableFuture.complete(hasMessageAvailable.get() || 
numMessagesInQueue() > 0);
             }
         });
         return completableFuture;

Reply via email to