This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ec74355 Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener ec74355 is described below commit ec7435565520ee6ac3ed2ce4a0cd13e8de7dbca3 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Sat Sep 15 00:15:13 2018 +0800 Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener ### Motivation fix issue #2574 . Timeout message not get redeliver in TopicsConsumer when use message listener. This is caused by message listener wrongly set in individual sub-ConsumerImpl. ### Modifications set message listener to null for individual sub-ConsumerImpl. Add a UT ### Result UT passed. --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 58 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 1 + 2 files changed, 59 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index d28d1d5..55be9ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -615,4 +616,61 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } } + /** + * Test Listener for github issue #2547 + */ + @Test(timeOut = 30000) + public void testMultiTopicsMessageListener() throws Exception { + String key = "MultiTopicsMessageListenerTest"; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 6; + + // set latch larger than totalMessages, so timeout message get resend + CountDownLatch latch = new CountDownLatch(totalMessages * 3); + + final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; + List<String> topicNames = Lists.newArrayList(topicName1); + + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName1, 2); + + // 1. producer connect + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + // 2. Create consumer, set not ack in message listener, so time-out message will resend + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1000, TimeUnit.MILLISECONDS) + .receiverQueueSize(100) + .messageListener((c1, msg) -> { + assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + latch.countDown(); + + log.info("Received message [{}] in the listener, latch: {}", + receivedMessage, latch.getCount()); + // since not acked, it should retry another time + //c1.acknowledgeAsync(msg); + }) + .subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + + MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) consumer; + + // 3. producer publish messages + for (int i = 0; i < totalMessages; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + } + + // verify should not time out, because of message redelivered several times. + latch.await(); + + consumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 75fdac6..8b8556a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -485,6 +485,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { ConsumerConfigurationData<T> internalConsumerConfig = conf.clone(); internalConsumerConfig.setSubscriptionName(subscription); internalConsumerConfig.setConsumerName(consumerName); + internalConsumerConfig.setMessageListener(null); return internalConsumerConfig; }