This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 3fbb41d4c3c [fix][client] Fix reach redeliverCount client can't send
messages to DLQ (#17287)
3fbb41d4c3c is described below
commit 3fbb41d4c3c4f7b9f6ab0e8015a6176b17c3ee75
Author: congbo <[email protected]>
AuthorDate: Sat Aug 27 12:38:08 2022 +0800
[fix][client] Fix reach redeliverCount client can't send messages to DLQ
(#17287)
(cherry picked from commit 4a28c087fe1308ea4eabc104b3d4889b47316afe)
---
.../client/impl/TransactionEndToEndTest.java | 55 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++--
2 files changed, 64 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 5ac5608f117..1df364f18b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -69,6 +70,7 @@ import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1105,4 +1107,57 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
assertTrue(ex instanceof PulsarClientException.TimeoutException);
}
}
+
+ @Test
+ public void testSendTxnAckMessageToDLQ() throws Exception {
+ String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
+ String subName = "test";
+ String value = "test";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ // consumer can't receive the same message three times
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
+ .topic(String.format("%s-%s" +
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+ topic, subName))
+ .subscriptionType(SubscriptionType.Shared)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName("test")
+ .subscribe();
+
+ producer.send(value.getBytes());
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+
+ // consumer receive the message the first time, redeliverCount = 0
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
transaction).get();
+
+ transaction.abort().get();
+
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(5,
TimeUnit.MINUTES)
+ .build().get();
+
+ // consumer receive the message the second time, redeliverCount = 1,
also can be received
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
transaction).get();
+
+ transaction.abort().get();
+
+ // consumer receive the message the third time, redeliverCount = 2,
+ // the message will be sent to DLQ, can't receive
+ assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+ assertEquals(value, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2be6c450ccd..8494e4804bb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1350,10 +1350,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
schema, redeliveryCount, consumerEpoch);
uncompressedPayload.release();
- if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null
- && redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)
message.getMessageId(),
- Collections.singletonList(message));
+ if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null) {
+ if (redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)
message.getMessageId(),
+ Collections.singletonList(message));
+ if (redeliveryCount >
deadLetterPolicy.getMaxRedeliverCount()) {
+
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
+ return;
+ }
+ }
}
executeNotifyCallback(message);
} else {