sijie closed pull request #2590: Issue #2584: unacked message is not
redelivered on time
URL: https://github.com/apache/incubator-pulsar/pull/2590
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966aa3..dfeb4afcc2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,7 +217,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int
batchMessageDelayMs,
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
- Thread.sleep(10);
msg = future_msg.get();
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61bdad034a..df383b532f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -2662,4 +2663,50 @@ public void received(Consumer consumer, Message message)
assertEquals(latch.getCount(), 1);
consumer.close();
}
+
+ /**
+ * Ack timeout message is redelivered on time.
+ * Related github issue #2584
+ */
+ @Test
+ public void testAckTimeoutRedeliver() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // create consumer and producer
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
+ .topic("persistent://my-property/my-ns/ack-timeout-topic")
+ .subscriptionName("subscriber-1")
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic("persistent://my-property/my-ns/ack-timeout-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // (1) Produced one Message
+ String content = "my-message-will-ack-timeout";
+ producer.send(content.getBytes());
+
+ // (2) consumer to receive messages, and not ack
+ Message<byte[]> message = consumer.receive();
+
+ // (3) should be re-delivered once ack-timeout.
+ Thread.sleep(1000);
+ message = consumer.receive(200, TimeUnit.MILLISECONDS);
+ assertNotNull(message);
+
+ Thread.sleep(1000);
+ message = consumer.receive(200, TimeUnit.MILLISECONDS);
+ assertNotNull(message);
+
+ assertEquals(content, new String(message.getData()));
+
+ producer.close();
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760d2b..e178febdcb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,16 +235,17 @@ public void testSharedSingleAckedPartitionedTopic()
throws Exception {
private static int receiveAllMessage(Consumer<?> consumer, boolean
ackMessages) throws Exception {
int messagesReceived = 0;
- Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
+ Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
while (msg != null) {
++messagesReceived;
- log.info("Consumer received {}", new String(msg.getData()));
+ log.info("Consumer {} received {}", consumer.getConsumerName(),
new String(msg.getData()));
if (ackMessages) {
consumer.acknowledge(msg);
+ log.info("Consumer {} acknowledged {}",
consumer.getConsumerName(), new String(msg.getData()));
}
- msg = consumer.receive(1, TimeUnit.SECONDS);
+ msg = consumer.receive(200, TimeUnit.MILLISECONDS);
}
return messagesReceived;
@@ -283,56 +284,31 @@ public void testFailoverSingleAckedPartitionedTopic()
throws Exception {
}
// 4. Receive messages
- Message<byte[]> message1 = consumer1.receive();
- Message<byte[]> message2 = consumer2.receive();
int messageCount1 = 0;
int messageCount2 = 0;
- int ackCount1 = 0;
- int ackCount2 = 0;
- do {
- if (message1 != null) {
- log.info("Consumer1 received " + new
String(message1.getData()));
- messageCount1 += 1;
- }
- if (message2 != null) {
- log.info("Consumer2 received " + new
String(message2.getData()));
- messageCount2 += 1;
- consumer2.acknowledge(message2);
- ackCount2 += 1;
- }
- message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
+
+ messageCount1 += receiveAllMessage(consumer1, false);
+ messageCount2 += receiveAllMessage(consumer2, true);
+
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+
assertEquals(messageCount1 + messageCount2, totalMessages);
+ Thread.sleep((int) (ackTimeOutMillis * 1.1));
+
// 5. Check if Messages redelivered again
// Since receive is a blocking call hoping that timeout will kick in
log.info(key + " Timeout should be triggered now");
- message1 = consumer1.receive();
messageCount1 = 0;
- do {
- if (message1 != null) {
- log.info("Consumer1 received " + new
String(message1.getData()));
- messageCount1 += 1;
- consumer1.acknowledge(message1);
- ackCount1 += 1;
- }
- if (message2 != null) {
- log.info("Consumer2 received " + new
String(message2.getData()));
- messageCount2 += 1;
- }
- message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
+
+ messageCount1 += receiveAllMessage(consumer1, true);
+ messageCount2 += receiveAllMessage(consumer2, false);
+
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
- assertEquals(ackCount1 + messageCount2, totalMessages);
+
+ assertEquals(messageCount1 + messageCount2, totalMessages);
}
@Test
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b9db..504de14fcf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,6 +87,7 @@ public void start(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ac
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
+ toggle();
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase,
oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
@@ -94,7 +95,6 @@ public void run(Timeout t) throws Exception {
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
- toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis,
TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services