This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ab2325  Issue #2584: unacked message is not redelivered on time 
(#2590)
0ab2325 is described below

commit 0ab2325fa33231f1c69782e081483012467dfcab
Author: Jia Zhai <[email protected]>
AuthorDate: Thu Sep 20 15:44:04 2018 +0800

    Issue #2584: unacked message is not redelivered on time (#2590)
    
    ### Motivation
    
    unacked message is not redelivered after setting ackTimeout, but it is 
actually redelivered after 2*acktimeout.
    
    The main reason is in UnAckedMessageTracker.
    ```
        public void start(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ackTimeoutMillis) {
            this.stop();
            timeout = client.timer().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout t) throws Exception {
                    if (isAckTimeout()) {   < === first timeout, it is false, 
because oldOpenSet is empty.
                        log.warn("[{}] {} messages have timed-out", 
consumerBase, oldOpenSet.size());
                        Set<MessageId> messageIds = new HashSet<>();
                        oldOpenSet.forEach(messageIds::add);
                        oldOpenSet.clear();
                        
consumerBase.redeliverUnacknowledgedMessages(messageIds);
                    }
                    toggle();    < === toggle after timeout
                    timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
                }
            }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
        }
    ```
    before first timeout, all messageId was added in CurrentSet, not in 
OldOpenSet, so isAckTimeout() is false, and `redeliverUnacknowledgedMessages` 
was not called at first timeout.
    
    Related issue: #2584
    
    ### Modifications
    
    - move `toggle()` from behind if clause to before if clause.
    - add ut
    
    ### Result
    
    ut passed
---
 .../client/api/SimpleProducerConsumerStatTest.java |  1 -
 .../client/api/SimpleProducerConsumerTest.java     | 47 ++++++++++++++++++
 .../impl/UnAcknowledgedMessagesTimeoutTest.java    | 58 +++++++---------------
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  2 +-
 4 files changed, 65 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966..dfeb4af 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,7 +217,6 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < numMessages; i++) {
             future_msg = consumer.receiveAsync();
-            Thread.sleep(10);
             msg = future_msg.get();
             String receivedMessage = new String(msg.getData());
             log.info("Received message: [{}]", receivedMessage);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61bdad0..df383b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -2662,4 +2663,50 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
+
+    /**
+     * Ack timeout message is redelivered on time.
+     * Related github issue #2584
+     */
+    @Test
+    public void testAckTimeoutRedeliver() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // create consumer and producer
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
+            .topic("persistent://my-property/my-ns/ack-timeout-topic")
+            .subscriptionName("subscriber-1")
+            .ackTimeout(1, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Shared)
+            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+            .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://my-property/my-ns/ack-timeout-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // (1) Produced one Message
+        String content = "my-message-will-ack-timeout";
+        producer.send(content.getBytes());
+
+        // (2) consumer to receive messages, and not ack
+        Message<byte[]> message = consumer.receive();
+
+        // (3) should be re-delivered once ack-timeout.
+        Thread.sleep(1000);
+        message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        assertNotNull(message);
+
+        Thread.sleep(1000);
+        message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        assertNotNull(message);
+
+        assertEquals(content, new String(message.getData()));
+
+        producer.close();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760..e178feb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,16 +235,17 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     private static int receiveAllMessage(Consumer<?> consumer, boolean 
ackMessages) throws Exception {
         int messagesReceived = 0;
-        Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
+        Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
         while (msg != null) {
             ++messagesReceived;
-            log.info("Consumer received {}", new String(msg.getData()));
+            log.info("Consumer {} received {}", consumer.getConsumerName(), 
new String(msg.getData()));
 
             if (ackMessages) {
                 consumer.acknowledge(msg);
+                log.info("Consumer {} acknowledged {}", 
consumer.getConsumerName(), new String(msg.getData()));
             }
 
-            msg = consumer.receive(1, TimeUnit.SECONDS);
+            msg = consumer.receive(200, TimeUnit.MILLISECONDS);
         }
 
         return messagesReceived;
@@ -283,56 +284,31 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         }
 
         // 4. Receive messages
-        Message<byte[]> message1 = consumer1.receive();
-        Message<byte[]> message2 = consumer2.receive();
         int messageCount1 = 0;
         int messageCount2 = 0;
-        int ackCount1 = 0;
-        int ackCount2 = 0;
-        do {
-            if (message1 != null) {
-                log.info("Consumer1 received " + new 
String(message1.getData()));
-                messageCount1 += 1;
-            }
-            if (message2 != null) {
-                log.info("Consumer2 received " + new 
String(message2.getData()));
-                messageCount2 += 1;
-                consumer2.acknowledge(message2);
-                ackCount2 += 1;
-            }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+
+        messageCount1 += receiveAllMessage(consumer1, false);
+        messageCount2 += receiveAllMessage(consumer2, true);
+
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+
         assertEquals(messageCount1 + messageCount2, totalMessages);
 
+        Thread.sleep((int) (ackTimeOutMillis * 1.1));
+
         // 5. Check if Messages redelivered again
         // Since receive is a blocking call hoping that timeout will kick in
         log.info(key + " Timeout should be triggered now");
-        message1 = consumer1.receive();
         messageCount1 = 0;
-        do {
-            if (message1 != null) {
-                log.info("Consumer1 received " + new 
String(message1.getData()));
-                messageCount1 += 1;
-                consumer1.acknowledge(message1);
-                ackCount1 += 1;
-            }
-            if (message2 != null) {
-                log.info("Consumer2 received " + new 
String(message2.getData()));
-                messageCount2 += 1;
-            }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+
+        messageCount1 += receiveAllMessage(consumer1, true);
+        messageCount2 += receiveAllMessage(consumer2, false);
+
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
-        assertEquals(ackCount1 + messageCount2, totalMessages);
+
+        assertEquals(messageCount1 + messageCount2, totalMessages);
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b..504de14 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,6 +87,7 @@ public class UnAckedMessageTracker implements Closeable {
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
+                toggle();
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
                     Set<MessageId> messageIds = new HashSet<>();
@@ -94,7 +95,6 @@ public class UnAckedMessageTracker implements Closeable {
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
                 }
-                toggle();
                 timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
             }
         }, ackTimeoutMillis, TimeUnit.MILLISECONDS);

Reply via email to