This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 18d8adb4c127a8fcf875d072294af811fbc0642b Author: lipenghui <[email protected]> AuthorDate: Mon Jan 20 13:52:34 2020 +0800 Fix message redelivery for zero queue consumer while using async api to receive messages (#6090) Fix message redelivery for zero queue consumer while using async api to receive messages --- .../pulsar/client/impl/ZeroQueueSizeTest.java | 39 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 1 + 2 files changed, 40 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index e5465b2..80174b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -20,10 +20,13 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; @@ -376,4 +379,40 @@ public class ZeroQueueSizeTest extends BrokerTestBase { consumer.close(); producer.close(); } + + @Test + public void testZeroQueueSizeMessageRedeliveryForAsyncReceive() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive"; + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + final int messages = 10; + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + Set<Integer> receivedMessages = new HashSet<>(); + List<CompletableFuture<Message<Integer>>> futures = new ArrayList<>(20); + for (int i = 0; i < messages * 2; i++) { + futures.add(consumer.receiveAsync()); + } + for (CompletableFuture<Message<Integer>> future : futures) { + receivedMessages.add(future.get().getValue()); + } + + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fce4cc0..3ebadf1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -977,6 +977,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (conf.getReceiverQueueSize() == 0) { // call interceptor and complete received callback + trackMessage(message); interceptAndComplete(message, receivedFuture); return; }
