This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ab2325 Issue #2584: unacked message is not redelivered on time
(#2590)
0ab2325 is described below
commit 0ab2325fa33231f1c69782e081483012467dfcab
Author: Jia Zhai <[email protected]>
AuthorDate: Thu Sep 20 15:44:04 2018 +0800
Issue #2584: unacked message is not redelivered on time (#2590)
### Motivation
unacked message is not redelivered after setting ackTimeout, but it is
actually redelivered after 2*acktimeout.
The main reason is in UnAckedMessageTracker.
```
public void start(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ackTimeoutMillis) {
this.stop();
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
if (isAckTimeout()) { < === first timeout, it is false,
because oldOpenSet is empty.
log.warn("[{}] {} messages have timed-out",
consumerBase, oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
toggle(); < === toggle after timeout
timeout = client.timer().newTimeout(this, ackTimeoutMillis,
TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}
```
before first timeout, all messageId was added in CurrentSet, not in
OldOpenSet, so isAckTimeout() is false, and `redeliverUnacknowledgedMessages`
was not called at first timeout.
Related issue: #2584
### Modifications
- move `toggle()` from behind if clause to before if clause.
- add ut
### Result
ut passed
---
.../client/api/SimpleProducerConsumerStatTest.java | 1 -
.../client/api/SimpleProducerConsumerTest.java | 47 ++++++++++++++++++
.../impl/UnAcknowledgedMessagesTimeoutTest.java | 58 +++++++---------------
.../pulsar/client/impl/UnAckedMessageTracker.java | 2 +-
4 files changed, 65 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966..dfeb4af 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,7 +217,6 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
- Thread.sleep(10);
msg = future_msg.get();
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61bdad0..df383b5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -2662,4 +2663,50 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(latch.getCount(), 1);
consumer.close();
}
+
+ /**
+ * Ack timeout message is redelivered on time.
+ * Related github issue #2584
+ */
+ @Test
+ public void testAckTimeoutRedeliver() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // create consumer and producer
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
+ .topic("persistent://my-property/my-ns/ack-timeout-topic")
+ .subscriptionName("subscriber-1")
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic("persistent://my-property/my-ns/ack-timeout-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // (1) Produced one Message
+ String content = "my-message-will-ack-timeout";
+ producer.send(content.getBytes());
+
+ // (2) consumer to receive messages, and not ack
+ Message<byte[]> message = consumer.receive();
+
+ // (3) should be re-delivered once ack-timeout.
+ Thread.sleep(1000);
+ message = consumer.receive(200, TimeUnit.MILLISECONDS);
+ assertNotNull(message);
+
+ Thread.sleep(1000);
+ message = consumer.receive(200, TimeUnit.MILLISECONDS);
+ assertNotNull(message);
+
+ assertEquals(content, new String(message.getData()));
+
+ producer.close();
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760..e178feb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,16 +235,17 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
private static int receiveAllMessage(Consumer<?> consumer, boolean
ackMessages) throws Exception {
int messagesReceived = 0;
- Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
+ Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
while (msg != null) {
++messagesReceived;
- log.info("Consumer received {}", new String(msg.getData()));
+ log.info("Consumer {} received {}", consumer.getConsumerName(),
new String(msg.getData()));
if (ackMessages) {
consumer.acknowledge(msg);
+ log.info("Consumer {} acknowledged {}",
consumer.getConsumerName(), new String(msg.getData()));
}
- msg = consumer.receive(1, TimeUnit.SECONDS);
+ msg = consumer.receive(200, TimeUnit.MILLISECONDS);
}
return messagesReceived;
@@ -283,56 +284,31 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
}
// 4. Receive messages
- Message<byte[]> message1 = consumer1.receive();
- Message<byte[]> message2 = consumer2.receive();
int messageCount1 = 0;
int messageCount2 = 0;
- int ackCount1 = 0;
- int ackCount2 = 0;
- do {
- if (message1 != null) {
- log.info("Consumer1 received " + new
String(message1.getData()));
- messageCount1 += 1;
- }
- if (message2 != null) {
- log.info("Consumer2 received " + new
String(message2.getData()));
- messageCount2 += 1;
- consumer2.acknowledge(message2);
- ackCount2 += 1;
- }
- message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
+
+ messageCount1 += receiveAllMessage(consumer1, false);
+ messageCount2 += receiveAllMessage(consumer2, true);
+
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+
assertEquals(messageCount1 + messageCount2, totalMessages);
+ Thread.sleep((int) (ackTimeOutMillis * 1.1));
+
// 5. Check if Messages redelivered again
// Since receive is a blocking call hoping that timeout will kick in
log.info(key + " Timeout should be triggered now");
- message1 = consumer1.receive();
messageCount1 = 0;
- do {
- if (message1 != null) {
- log.info("Consumer1 received " + new
String(message1.getData()));
- messageCount1 += 1;
- consumer1.acknowledge(message1);
- ackCount1 += 1;
- }
- if (message2 != null) {
- log.info("Consumer2 received " + new
String(message2.getData()));
- messageCount2 += 1;
- }
- message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
+
+ messageCount1 += receiveAllMessage(consumer1, true);
+ messageCount2 += receiveAllMessage(consumer2, false);
+
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
- assertEquals(ackCount1 + messageCount2, totalMessages);
+
+ assertEquals(messageCount1 + messageCount2, totalMessages);
}
@Test
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b..504de14 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,6 +87,7 @@ public class UnAckedMessageTracker implements Closeable {
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
+ toggle();
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase,
oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
@@ -94,7 +95,6 @@ public class UnAckedMessageTracker implements Closeable {
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
- toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis,
TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);