This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 746cfb5b78959546b111e1a7f148e07cfb3a2baf Author: Yunze Xu <[email protected]> AuthorDate: Thu Apr 7 14:55:16 2022 +0800 [Client] Add test to ensure the message order in listener callbacks (#15049) (cherry picked from commit c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479) --- .../client/api/SimpleProducerConsumerTest.java | 30 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 9 +++++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 6 ----- .../client/impl/MultiTopicsConsumerImpl.java | 4 +-- .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2815ce36bf0..f5677d36273 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -60,6 +60,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; @@ -4407,4 +4408,33 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + + @Test(invocationCount = 5) + public void testListenerOrdering() throws Exception { + final String topic = "persistent://my-property/my-ns/test-listener-ordering-" + System.currentTimeMillis(); + final int numMessages = 1000; + final CountDownLatch latch = new CountDownLatch(numMessages); + final List<String> values = new CopyOnWriteArrayList<>(); + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .messageListener((MessageListener<String>) (consumer1, msg) -> { + values.add(msg.getValue()); + latch.countDown(); + }) + .subscribe(); + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + for (int i = 0; i < numMessages; i++) { + producer.send("msg-" + i); + } + latch.await(3, TimeUnit.SECONDS); + producer.close(); + consumer.close(); + assertEquals(values.size(), numMessages); + for (int i = 0; i < numMessages; i++) { + assertEquals(values.get(i), "msg-" + i); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 20baf47d9de..b50a5700093 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -913,8 +913,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } } - protected void triggerListener() { - // Use internalPinnedExecutor to maintain message ordering + protected void tryTriggerListener() { + if (listener != null) { + triggerListener(); + } + } + + private void triggerListener() { internalPinnedExecutor.execute(() -> { try { // Listener should only have one pending/running executable to process a message diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ff2612beee4..21a3151d04a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1236,12 +1236,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } - private void tryTriggerListener() { - if (listener != null) { - triggerListener(); - } - } - private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, MessageIdData messageId, ClientCnx cnx) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 74898a2a62a..1d479976950 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -301,9 +301,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { notifyPendingBatchReceivedCallBack(); } - if (listener != null) { - triggerListener(); - } + tryTriggerListener(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 42361b67caf..5324c33d011 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -174,7 +174,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { } @Override - protected void triggerListener() { + protected void tryTriggerListener() { // Ignore since it was already triggered in the triggerZeroQueueSizeListener() call }
