sijie closed pull request #3110: Issue #3059: Behavior of redelivery and 
ackTimeout in Java client error
URL: https://github.com/apache/pulsar/pull/3110
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index dfeb4afcc2..2c94966aa3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,6 +217,7 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int 
batchMessageDelayMs,
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < numMessages; i++) {
             future_msg = consumer.receiveAsync();
+            Thread.sleep(10);
             msg = future_msg.get();
             String receivedMessage = new String(msg.getData());
             log.info("Received message: [{}]", receivedMessage);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index fb47c5052d..d38ca35f01 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,7 +25,6 @@
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -2843,50 +2842,4 @@ public void received(Consumer consumer, Message message)
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
-
-    /**
-     * Ack timeout message is redelivered on time.
-     * Related github issue #2584
-     */
-    @Test
-    public void testAckTimeoutRedeliver() throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        // create consumer and producer
-        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
-            .topic("persistent://my-property/my-ns/ack-timeout-topic")
-            .subscriptionName("subscriber-1")
-            .ackTimeout(1, TimeUnit.SECONDS)
-            .subscriptionType(SubscriptionType.Shared)
-            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
-            .subscribe();
-
-        Producer<byte[]> producer = pulsarClient.newProducer()
-            .topic("persistent://my-property/my-ns/ack-timeout-topic")
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-
-        // (1) Produced one Message
-        String content = "my-message-will-ack-timeout";
-        producer.send(content.getBytes());
-
-        // (2) consumer to receive messages, and not ack
-        Message<byte[]> message = consumer.receive();
-
-        // (3) should be re-delivered once ack-timeout.
-        Thread.sleep(1000);
-        message = consumer.receive(200, TimeUnit.MILLISECONDS);
-        assertNotNull(message);
-
-        Thread.sleep(1000);
-        message = consumer.receive(200, TimeUnit.MILLISECONDS);
-        assertNotNull(message);
-
-        assertEquals(content, new String(message.getData()));
-
-        producer.close();
-        consumer.close();
-        log.info("-- Exiting {} test --", methodName);
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index e178febdcb..da53760d2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,17 +235,16 @@ public void testSharedSingleAckedPartitionedTopic() 
throws Exception {
 
     private static int receiveAllMessage(Consumer<?> consumer, boolean 
ackMessages) throws Exception {
         int messagesReceived = 0;
-        Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
+        Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
         while (msg != null) {
             ++messagesReceived;
-            log.info("Consumer {} received {}", consumer.getConsumerName(), 
new String(msg.getData()));
+            log.info("Consumer received {}", new String(msg.getData()));
 
             if (ackMessages) {
                 consumer.acknowledge(msg);
-                log.info("Consumer {} acknowledged {}", 
consumer.getConsumerName(), new String(msg.getData()));
             }
 
-            msg = consumer.receive(200, TimeUnit.MILLISECONDS);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
         }
 
         return messagesReceived;
@@ -284,31 +283,56 @@ public void testFailoverSingleAckedPartitionedTopic() 
throws Exception {
         }
 
         // 4. Receive messages
+        Message<byte[]> message1 = consumer1.receive();
+        Message<byte[]> message2 = consumer2.receive();
         int messageCount1 = 0;
         int messageCount2 = 0;
-
-        messageCount1 += receiveAllMessage(consumer1, false);
-        messageCount2 += receiveAllMessage(consumer2, true);
-
+        int ackCount1 = 0;
+        int ackCount2 = 0;
+        do {
+            if (message1 != null) {
+                log.info("Consumer1 received " + new 
String(message1.getData()));
+                messageCount1 += 1;
+            }
+            if (message2 != null) {
+                log.info("Consumer2 received " + new 
String(message2.getData()));
+                messageCount2 += 1;
+                consumer2.acknowledge(message2);
+                ackCount2 += 1;
+            }
+            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
+            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
+        } while (message1 != null || message2 != null);
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-
+        log.info(key + " ackCount1 = " + ackCount1);
+        log.info(key + " ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2, totalMessages);
 
-        Thread.sleep((int) (ackTimeOutMillis * 1.1));
-
         // 5. Check if Messages redelivered again
         // Since receive is a blocking call hoping that timeout will kick in
         log.info(key + " Timeout should be triggered now");
+        message1 = consumer1.receive();
         messageCount1 = 0;
-
-        messageCount1 += receiveAllMessage(consumer1, true);
-        messageCount2 += receiveAllMessage(consumer2, false);
-
+        do {
+            if (message1 != null) {
+                log.info("Consumer1 received " + new 
String(message1.getData()));
+                messageCount1 += 1;
+                consumer1.acknowledge(message1);
+                ackCount1 += 1;
+            }
+            if (message2 != null) {
+                log.info("Consumer2 received " + new 
String(message2.getData()));
+                messageCount2 += 1;
+            }
+            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
+            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
+        } while (message1 != null || message2 != null);
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
-
-        assertEquals(messageCount1 + messageCount2, totalMessages);
+        log.info(key + " ackCount1 = " + ackCount1);
+        log.info(key + " ackCount2 = " + ackCount2);
+        assertEquals(ackCount1 + messageCount2, totalMessages);
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 504de14fcf..266eb3b9db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,7 +87,6 @@ public void start(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ac
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
-                toggle();
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
                     Set<MessageId> messageIds = new HashSet<>();
@@ -95,6 +94,7 @@ public void run(Timeout t) throws Exception {
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
                 }
+                toggle();
                 timeout = client.timer().newTimeout(this, ackTimeoutMillis, 
TimeUnit.MILLISECONDS);
             }
         }, ackTimeoutMillis, TimeUnit.MILLISECONDS);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to