This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 81beccdbf0bac4e8b6197f8dc897d46b78c13a37 Author: Yunze Xu <[email protected]> AuthorDate: Thu Apr 7 14:55:16 2022 +0800 [Client] Add test to ensure the message order in listener callbacks (#15049) (cherry picked from commit c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479) Resolve the conflicts by removing unused `isTxnMessage` and `tryTriggerListener` methods from `ConsumerImpl`. --- .../client/api/SimpleProducerConsumerTest.java | 30 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 9 +++++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 10 -------- .../client/impl/MultiTopicsConsumerImpl.java | 4 +-- .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +- 5 files changed, 39 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 86ee045b258..d2b67575324 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; @@ -4250,4 +4251,33 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + + @Test(invocationCount = 5) + public void testListenerOrdering() throws Exception { + final String topic = "persistent://my-property/my-ns/test-listener-ordering-" + System.currentTimeMillis(); + final int numMessages = 1000; + final CountDownLatch latch = new CountDownLatch(numMessages); + final List<String> values = new CopyOnWriteArrayList<>(); + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .messageListener((MessageListener<String>) (consumer1, msg) -> { + values.add(msg.getValue()); + latch.countDown(); + }) + .subscribe(); + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + for (int i = 0; i < numMessages; i++) { + producer.send("msg-" + i); + } + latch.await(3, TimeUnit.SECONDS); + producer.close(); + consumer.close(); + assertEquals(values.size(), numMessages); + for (int i = 0; i < numMessages; i++) { + assertEquals(values.get(i), "msg-" + i); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dd21666f5da..7bc63d89f98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -906,8 +906,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } } - protected void triggerListener() { - // Use internalPinnedExecutor to maintain message ordering + protected void tryTriggerListener() { + if (listener != null) { + triggerListener(); + } + } + + private void triggerListener() { internalPinnedExecutor.execute(() -> { try { // Listener should only have one pending/running executable to process a message diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8104123e4c3..f680d1e4f88 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1090,16 +1090,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } - private void tryTriggerListener() { - if (listener != null) { - triggerListener(); - } - } - - private boolean isTxnMessage(MessageMetadata messageMetadata) { - return messageMetadata.hasTxnidMostBits() && messageMetadata.hasTxnidLeastBits(); - } - private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, MessageIdData messageId, ClientCnx cnx) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index fd98bb6fb4b..bcccd09d7c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -301,9 +301,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { notifyPendingBatchReceivedCallBack(); } - if (listener != null) { - triggerListener(); - } + tryTriggerListener(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index f6302e3779c..fca9cffcfdc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -173,7 +173,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { } @Override - protected void triggerListener() { + protected void tryTriggerListener() { // Ignore since it was already triggered in the triggerZeroQueueSizeListener() call }
