This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 096b725edc9824e5599fd4b0f6bf99fa6d5e3403 Author: Kai Wang <[email protected]> AuthorDate: Thu Jan 13 00:46:45 2022 +0800 Fix the wrong multi-topic has message available behavior (#13634) Fixes #13605 ### Motivation Currently, the multiTopicReader `hasMessageAvailable` might get the wrong result, we must check `numMessagesInQueue() > 0` again after finish all consumer `hasMessageAvaliableAsync` future, bacause some message might already in `MultiTopicsConsumerImpl#incomingMessages`. ### Modifications * Fix the wrong multi-topic has message available behavior. * Use `reader.readNextAsync()` instead of block method `reader.readNext()`. * Reduce the units test running time by changing `MultiTopicsReaderTest` to use `@BeforeClass`, `@AfterClass`. (cherry picked from commit e57dc8faa2979b6d03410fa2cef5f1dd84e46785) --- .../pulsar/client/impl/MultiTopicsReaderTest.java | 26 +++++++++++----------- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java index f6230e2..6b6bf95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java @@ -61,17 +61,17 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -@Test(groups = "flaky") @Slf4j +@Test(groups = "flaky") public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { private static final String subscription = "reader-multi-topics-sub"; - @BeforeMethod(alwaysRun = true) + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); @@ -87,7 +87,7 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { admin.namespaces().createNamespace("my-property/my-ns", policies); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -169,15 +169,15 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) { reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> { if (hasMessageAvailable) { - try { - Message<T> msg = reader.readNext(); + reader.readNextAsync().whenComplete((msg, ex) -> { + if (ex != null) { + log.error("Read message failed.", ex); + latch.countDown(); + return; + } msgs.add(msg); - } catch (PulsarClientException e) { - log.error("Read message failed.", e); - latch.countDown(); - return; - } - readMessageUseAsync(reader, msgs, latch); + readMessageUseAsync(reader, msgs, latch); + }); } else { latch.countDown(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 177cc9f..fd98bb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -771,7 +771,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { if (exception != null) { completableFuture.completeExceptionally(exception); } else { - completableFuture.complete(hasMessageAvailable.get()); + completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0); } }); return completableFuture;
