This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5d06b1e Change un-ack messages start tracking behavior (#3079)
5d06b1e is described below
commit 5d06b1e128b6747952a855d02c8a2936d2a35cce
Author: penghui <[email protected]>
AuthorDate: Tue Dec 4 03:47:33 2018 +0800
Change un-ack messages start tracking behavior (#3079)
### Motivation
#### Expected behavior
User process the same message many times but failed, if user set a dead
letter policy, when message process times exceed the max redelivery count in
dead letter policy, message will send to the dead letter topic.
#### Actual behavior
When a consumer subscribe a topic, but wait a while then start receive
messages, but messages already send to dead letter topic.
#### Steps to reproduce
Here is the code to reproduce
```java
public class RedeliveryIssue {
public static void main(String[] args) throws PulsarClientException,
InterruptedException {
final String topic = "my-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(UUID.randomUUID().toString())
.ackTimeout(3, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
.subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
producer.send(("a message").getBytes());
// wait a while, message will send to dead letter topic
Thread.sleep(10000L);
do {
// can't receive message
Message<byte[]> msg = consumer.receive();
System.out.println(new String(msg.getValue()));
} while (true);
}
}
```
#### System configuration
**Pulsar version**: 2.2.0
### Modifications
Remove un-ack message tracking on message received.
Add un-ack message tracking on consumer call receive
### Result
UT passed
---
.../pulsar/client/api/DeadLetterTopicTest.java | 29 ++++++++++++++++++++++
.../impl/UnAcknowledgedMessagesTimeoutTest.java | 6 +++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 20 +++++++++++++--
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2ff500b..82df18c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -256,4 +257,32 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
assertNull(checkMessage);
checkConsumer.close();
}
+
+ /**
+ * issue https://github.com/apache/pulsar/issues/3077
+ */
+ @Test(timeOut = 200000)
+ public void testDeadLetterWithoutConsumerReceiveImmediately() throws
PulsarClientException, InterruptedException {
+ final String topic =
"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-subscription")
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+
+ producer.send(("a message").getBytes());
+
+ // Wait a while, message should not be send to DLQ
+ Thread.sleep(5000L);
+
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760..794ce31 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -373,9 +373,11 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
Thread.sleep((long) (ackTimeOutMillis * 1.1));
- for (int i = 0; i < totalMessages - 1; i++) {
+ for (int i = 0; i < totalMessages; i++) {
Message<byte[]> msg = consumer.receive();
- consumer.acknowledge(msg);
+ if (i != totalMessages - 1) {
+ consumer.acknowledge(msg);
+ }
}
assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6ae925d..3e2d4d6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -294,6 +294,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.take();
+ trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
return interceptMsg;
@@ -325,6 +326,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (message == null && conf.getReceiverQueueSize() == 0) {
sendFlowPermitsToBroker(cnx(), 1);
} else if (message != null) {
+ trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
result.complete(interceptMsg);
@@ -385,6 +387,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
+ trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
if (interceptMsg != null) {
messageProcessed(interceptMsg);
@@ -829,11 +832,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// Enqueue the message so that it can be retrieved when
application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard
message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without
adding to incomingMessages queue
- unAckedMessageTracker.add((MessageIdImpl)
message.getMessageId());
if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
}
if (!pendingReceives.isEmpty()) {
+ trackMessage(message);
notifyPendingReceivedCallback(message, null);
} else if (conf.getReceiverQueueSize() != 0 ||
waitingOnReceiveForZeroQueueSize) {
incomingMessages.add(message);
@@ -957,7 +960,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
MessageIdImpl batchMessage = new
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
- unAckedMessageTracker.add(batchMessage);
List<MessageImpl<T>> possibleToDeadLetter = null;
if (deadLetterPolicy != null && redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
possibleToDeadLetter = new ArrayList<>();
@@ -1068,6 +1070,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ protected void trackMessage(Message<?> msg) {
+ if (msg != null) {
+ MessageId messageId = msg.getMessageId();
+ if (conf.getAckTimeoutMillis() > 0 && messageId instanceof
MessageIdImpl) {
+ MessageIdImpl id = (MessageIdImpl)messageId;
+ if (id instanceof BatchMessageIdImpl) {
+ // do not add each item in batch message into tracker
+ id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(),
getPartitionIndex());
+ }
+ unAckedMessageTracker.add(id);
+ }
+ }
+ }
+
void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}