This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 81beccdbf0bac4e8b6197f8dc897d46b78c13a37
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Apr 7 14:55:16 2022 +0800

    [Client] Add test to ensure the message order in listener callbacks (#15049)
    
    (cherry picked from commit c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479)
    
    Resolve the conflicts by removing unused `isTxnMessage` and
    `tryTriggerListener` methods from `ConsumerImpl`.
---
 .../client/api/SimpleProducerConsumerTest.java     | 30 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 +++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 --------
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 +--
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 5 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 86ee045b258..d2b67575324 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -59,6 +59,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
@@ -4250,4 +4251,33 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(invocationCount = 5)
+    public void testListenerOrdering() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-listener-ordering-" + 
System.currentTimeMillis();
+        final int numMessages = 1000;
+        final CountDownLatch latch = new CountDownLatch(numMessages);
+        final List<String> values = new CopyOnWriteArrayList<>();
+        final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .messageListener((MessageListener<String>) (consumer1, msg) -> 
{
+                    values.add(msg.getValue());
+                    latch.countDown();
+                })
+                .subscribe();
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("msg-" + i);
+        }
+        latch.await(3, TimeUnit.SECONDS);
+        producer.close();
+        consumer.close();
+        assertEquals(values.size(), numMessages);
+        for (int i = 0; i < numMessages; i++) {
+            assertEquals(values.get(i), "msg-" + i);
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dd21666f5da..7bc63d89f98 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -906,8 +906,13 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
-    protected void triggerListener() {
-        // Use internalPinnedExecutor to maintain message ordering
+    protected void tryTriggerListener() {
+        if (listener != null) {
+            triggerListener();
+        }
+    }
+
+    private void triggerListener() {
         internalPinnedExecutor.execute(() -> {
             try {
                 // Listener should only have one pending/running executable to 
process a message
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8104123e4c3..f680d1e4f88 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1090,16 +1090,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     }
 
-    private void tryTriggerListener() {
-        if (listener != null) {
-            triggerListener();
-        }
-    }
-
-    private boolean isTxnMessage(MessageMetadata messageMetadata) {
-        return messageMetadata.hasTxnidMostBits() && 
messageMetadata.hasTxnidLeastBits();
-    }
-
     private ByteBuf processMessageChunk(ByteBuf compressedPayload, 
MessageMetadata msgMetadata, MessageIdImpl msgId,
             MessageIdData messageId, ClientCnx cnx) {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fd98bb6fb4b..bcccd09d7c7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -301,9 +301,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             notifyPendingBatchReceivedCallBack();
         }
 
-        if (listener != null) {
-            triggerListener();
-        }
+        tryTriggerListener();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index f6302e3779c..fca9cffcfdc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -173,7 +173,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     }
 
     @Override
-    protected void triggerListener() {
+    protected void tryTriggerListener() {
         // Ignore since it was already triggered in the 
triggerZeroQueueSizeListener() call
     }
 

Reply via email to