This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 04455e30c0201b75e2a4cb723cffa4bde269a2d1 Author: Kai Wang <[email protected]> AuthorDate: Thu Dec 23 16:36:27 2021 +0800 [pulsar-client] Fix multi topic reader has message available behavior (#13332) ### Motivation When we use a multi-topic reader, the `hasMessageAvailable` method might have the wrong behavior, since the multi-topics consumer receives all messages from the single-topic consumer, the single-topic consumer `hasMessageAvailable` might always be `false` (The lastDequeuedMessageId reach to the end of the queue, all message enqueue to multi-topic consumer's `incomingMessages` queue). We should check the multi-topics consumer `incomingMessages` size > 0 when calling `hasMessageAvailable `. ### Modifications 1. Add a check of `incomingMessages` size > 0 2. Add units test `testHasMessageAvailableAsync` to verify the behavior. (cherry picked from commit 6c7dcc0cf877cfcb8bcea18cde7662ebacb01d4c) --- .../pulsar/client/impl/MultiTopicsReaderTest.java | 67 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 3 + .../pulsar/client/impl/MultiTopicsReaderImpl.java | 2 +- 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java index a8a6ced..f6230e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java @@ -26,15 +26,19 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -43,6 +47,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; @@ -61,6 +66,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "flaky") +@Slf4j public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { private static final String subscription = "reader-multi-topics-sub"; @@ -122,6 +128,67 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { } @Test(timeOut = 10000) + public void testHasMessageAvailableAsync() throws Exception { + String topic = "persistent://my-property/my-ns/testHasMessageAvailableAsync"; + String content = "my-message-"; + int msgNum = 10; + admin.topics().createPartitionedTopic(topic, 2); + // stop retention from cleaning up + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); + + try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true) + .startMessageId(MessageId.earliest).create()) { + Assert.assertFalse(reader.hasMessageAvailable()); + Assert.assertFalse(reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS)); + } + + try (Reader<byte[]> reader = pulsarClient.newReader() + .topic(topic).startMessageId(MessageId.earliest).create()) { + try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) { + for (int i = 0; i < msgNum; i++) { + producer.newMessage().key(content + i) + .value((content + i).getBytes(StandardCharsets.UTF_8)).send(); + } + } + // Should have message available + Assert.assertTrue(reader.hasMessageAvailableAsync().get()); + try { + // Should have message available too + Assert.assertTrue(reader.hasMessageAvailable()); + } catch (PulsarClientException e) { + fail("Expect success but failed.", e); + } + List<Message<byte[]>> msgs = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch latch = new CountDownLatch(1); + readMessageUseAsync(reader, msgs, latch); + latch.await(); + Assert.assertEquals(msgs.size(), msgNum); + } + } + + private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) { + reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> { + if (hasMessageAvailable) { + try { + Message<T> msg = reader.readNext(); + msgs.add(msg); + } catch (PulsarClientException e) { + log.error("Read message failed.", e); + latch.countDown(); + return; + } + readMessageUseAsync(reader, msgs, latch); + } else { + latch.countDown(); + } + }).exceptionally(throwable -> { + log.error("Read message failed.", throwable); + latch.countDown(); + return null; + }); + } + + @Test(timeOut = 10000) public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID(); int topicNum = 3; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 09dc5643..d2646a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -754,6 +754,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } public CompletableFuture<Boolean> hasMessageAvailableAsync() { + if (numMessagesInQueue() > 0) { + return CompletableFuture.completedFuture(true); + } List<CompletableFuture<Void>> futureList = new ArrayList<>(); final AtomicBoolean hasMessageAvailable = new AtomicBoolean(false); for (ConsumerImpl<T> consumer : consumers.values()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index fab61b2..b656c00 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -144,7 +144,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> { @Override public boolean hasMessageAvailable() throws PulsarClientException { - return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0; + return multiTopicsConsumer.hasMessageAvailable(); } @Override
