This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 718068d5bed [fix][client] Fix reach redeliverCount client can't send
batch messages to DLQ (#17317)
718068d5bed is described below
commit 718068d5bedd7cf07d2d8e06efbc67f8d581e82a
Author: congbo <[email protected]>
AuthorDate: Sun Aug 28 20:10:57 2022 +0800
[fix][client] Fix reach redeliverCount client can't send batch messages to
DLQ (#17317)
(cherry picked from commit 0909853873fc61395df7a68de13335d9f770383a)
---
.../client/impl/TransactionEndToEndTest.java | 66 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 +++-
2 files changed, 75 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 1df364f18b2..6868f49fa14 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1116,6 +1116,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
.topic(topic)
+ .enableBatching(false)
.sendTimeout(1, TimeUnit.SECONDS)
.create();
@@ -1160,4 +1161,69 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
assertEquals(value, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
}
+
+ @Test
+ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
+ String topic = NAMESPACE1 + "/testSendTxnAckBatchMessageToDLQ";
+ String subName = "test";
+ String value1 = "test1";
+ String value2 = "test2";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ // consumer can't receive the same message three times
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
+ .topic(String.format("%s-%s" +
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+ topic, subName))
+ .subscriptionType(SubscriptionType.Shared)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName("test")
+ .subscribe();
+
+ producer.sendAsync(value1.getBytes());
+ producer.sendAsync(value2.getBytes());
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+
+ Message<byte[]> message = consumer.receive();
+ assertEquals(value1, new String(message.getValue()));
+ // consumer receive the batch message one the first time,
redeliverCount = 0
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+ transaction.abort().get();
+
+ // consumer will receive the batch message two and then receive
+ // the message one and message two again, redeliverCount = 1
+ for (int i = 0; i < 3; i ++) {
+ message = consumer.receive();
+ }
+
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(5,
TimeUnit.MINUTES)
+ .build().get();
+
+ assertEquals(value2, new String(message.getValue()));
+ // consumer receive the batch message two the second time,
redeliverCount = 1, also can be received
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+ transaction.abort().get();
+
+ // consumer receive the batch message the third time, redeliverCount =
2,
+ // the message will be sent to DLQ, can't receive
+ assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+ assertEquals(value1, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
+ assertEquals(value2, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8494e4804bb..f48ae2ed280 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1534,8 +1534,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
discardCorruptedMessage(messageId, cnx,
ValidationError.BatchDeSerializeError);
}
- if (possibleToDeadLetter != null &&
possibleSendToDeadLetterTopicMessages != null) {
- possibleSendToDeadLetterTopicMessages.put(batchMessage,
possibleToDeadLetter);
+ if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages
!= null) {
+ if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put(batchMessage,
+ possibleToDeadLetter);
+ if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount())
{
+
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
+ return;
+ }
+ }
}
if (log.isDebugEnabled()) {