sijie closed pull request #2590: Issue #2584: unacked message is not 
redelivered on time
URL: https://github.com/apache/incubator-pulsar/pull/2590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966aa3..dfeb4afcc2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,7 +217,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int 
batchMessageDelayMs,
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < numMessages; i++) {
             future_msg = consumer.receiveAsync();
-            Thread.sleep(10);
             msg = future_msg.get();
             String receivedMessage = new String(msg.getData());
             log.info("Received message: [{}]", receivedMessage);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61bdad034a..df383b532f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,6 +25,7 @@
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -2662,4 +2663,50 @@ public void received(Consumer consumer, Message message)
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
+
+    /**
+     * Ack timeout message is redelivered on time.
+     * Related github issue #2584
+     */
+    @Test
+    public void testAckTimeoutRedeliver() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // create consumer and producer
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
+            .topic("persistent://my-property/my-ns/ack-timeout-topic")
+            .subscriptionName("subscriber-1")
+            .ackTimeout(1, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Shared)
+            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+            .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://my-property/my-ns/ack-timeout-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // (1) Produced one Message
+        String content = "my-message-will-ack-timeout";
+        producer.send(content.getBytes());
+
+        // (2) consumer to receive messages, and not ack
+        Message<byte[]> message = consumer.receive();
+
+        // (3) should be re-delivered once ack-timeout.
+        Thread.sleep(1000);
+        message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        assertNotNull(message);
+
+        Thread.sleep(1000);
+        message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        assertNotNull(message);
+
+        assertEquals(content, new String(message.getData()));
+
+        producer.close();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760d2b..e178febdcb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,16 +235,17 @@ public void testSharedSingleAckedPartitionedTopic() 
throws Exception {
 
     private static int receiveAllMessage(Consumer<?> consumer, boolean 
ackMessages) throws Exception {
         int messagesReceived = 0;
-        Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
+        Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
         while (msg != null) {
             ++messagesReceived;
-            log.info("Consumer received {}", new String(msg.getData()));
+            log.info("Consumer {} received {}", consumer.getConsumerName(), 
new String(msg.getData()));
 
             if (ackMessages) {
                 consumer.acknowledge(msg);
+                log.info("Consumer {} acknowledged {}", 
consumer.getConsumerName(), new String(msg.getData()));
             }
 
-            msg = consumer.receive(1, TimeUnit.SECONDS);
+            msg = consumer.receive(200, TimeUnit.MILLISECONDS);
         }
 
         return messagesReceived;
@@ -283,56 +284,31 @@ public void testFailoverSingleAckedPartitionedTopic() 
throws Exception {
         }
 
         // 4. Receive messages
-        Message<byte[]> message1 = consumer1.receive();
-        Message<byte[]> message2 = consumer2.receive();
         int messageCount1 = 0;
         int messageCount2 = 0;
-        int ackCount1 = 0;
-        int ackCount2 = 0;
-        do {
-            if (message1 != null) {
-                log.info("Consumer1 received " + new 
String(message1.getData()));
-                messageCount1 += 1;
-            }
-            if (message2 != null) {
-                log.info("Consumer2 received " + new 
String(message2.getData()));
-                messageCount2 += 1;
-                consumer2.acknowledge(message2);
-                ackCount2 += 1;
-            }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+
+        messageCount1 += receiveAllMessage(consumer1, false);
+        messageCount2 += receiveAllMessage(consumer2, true);
+
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+
         assertEquals(messageCount1 + messageCount2, totalMessages);
 
+        Thread.sleep((int) (ackTimeOutMillis * 1.1));
+
         // 5. Check if Messages redelivered again
         // Since receive is a blocking call hoping that timeout will kick in
         log.info(key + " Timeout should be triggered now");
-        message1 = consumer1.receive();
         messageCount1 = 0;
-        do {
-            if (message1 != null) {
-                log.info("Consumer1 received " + new 
String(message1.getData()));
-                messageCount1 += 1;
-                consumer1.acknowledge(message1);
-                ackCount1 += 1;
-            }
-            if (message2 != null) {
-                log.info("Consumer2 received " + new 
String(message2.getData()));
-                messageCount2 += 1;
-            }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+
+        messageCount1 += receiveAllMessage(consumer1, true);
+        messageCount2 += receiveAllMessage(consumer2, false);
+
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
-        assertEquals(ackCount1 + messageCount2, totalMessages);
+
+        assertEquals(messageCount1 + messageCount2, totalMessages);
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b9db..504de14fcf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,6 +87,7 @@ public void start(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ac
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
+                toggle();
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
                     Set<MessageId> messageIds = new HashSet<>();
@@ -94,7 +95,6 @@ public void run(Timeout t) throws Exception {
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
                 }
-                toggle();
                 timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
             }
         }, ackTimeoutMillis, TimeUnit.MILLISECONDS);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to