This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 18d8adb4c127a8fcf875d072294af811fbc0642b
Author: lipenghui <[email protected]>
AuthorDate: Mon Jan 20 13:52:34 2020 +0800

    Fix message redelivery for zero queue consumer while using async api to 
receive messages (#6090)
    
    Fix message redelivery for zero queue consumer while using async api to 
receive messages
---
 .../pulsar/client/impl/ZeroQueueSizeTest.java      | 39 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 +
 2 files changed, 40 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index e5465b2..80174b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
@@ -376,4 +379,40 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testZeroQueueSizeMessageRedeliveryForAsyncReceive() throws 
PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = 
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive";
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .receiverQueueSize(0)
+            .subscriptionName("sub")
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(1, TimeUnit.SECONDS)
+            .subscribe();
+
+        final int messages = 10;
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .enableBatching(false)
+            .create();
+
+        for (int i = 0; i < messages; i++) {
+            producer.send(i);
+        }
+
+        Set<Integer> receivedMessages = new HashSet<>();
+        List<CompletableFuture<Message<Integer>>> futures = new 
ArrayList<>(20);
+        for (int i = 0; i < messages * 2; i++) {
+            futures.add(consumer.receiveAsync());
+        }
+        for (CompletableFuture<Message<Integer>> future : futures) {
+            receivedMessages.add(future.get().getValue());
+        }
+
+        Assert.assertEquals(receivedMessages.size(), messages);
+
+        consumer.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fce4cc0..3ebadf1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -977,6 +977,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         if (conf.getReceiverQueueSize() == 0) {
             // call interceptor and complete received callback
+            trackMessage(message);
             interceptAndComplete(message, receivedFuture);
             return;
         }

Reply via email to