shibd commented on code in PR #139:
URL: https://github.com/apache/pulsar-client-cpp/pull/139#discussion_r1063260550
##########
lib/ConsumerImpl.cc:
##########
@@ -527,6 +544,10 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
<< startMessageId.value());
return;
}
+ if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(),
std::vector<Message>{m});
+ increaseAvailablePermits(cnx);
Review Comment:
> And should we call redeliverUnacknowledgedMessages here as well?
Yes, Need to call `redeliverUnacknowledgedMessages(const
std::set<MessageId>& messageIds)`, I understand it wrong before.
The design of the DLQ goes like this:
When receiving the message `redeliveryCount ==
deadletterPolicy_.getMaxRedeliverCount`, We add it to
`possibleSendToDeadLetterTopicMessages_` waiting for the user to call `nack
method` or `ackTimeOut` trigger send to DLQ.
https://github.com/apache/pulsar-client-cpp/blob/e818ead391dc52ef08bbd59c748a00492dcf0cb5/lib/NegativeAcksTracker.cc#L82-L84
https://github.com/apache/pulsar-client-cpp/blob/1721e0005975bcc9cbd49566d6047760e6621a3b/lib/UnAckedMessageTrackerEnabled.cc#L68-L73
This can reduce once redeliver message from the server to the client.
But there are also unexpected scenarios, case `transaction.abort` :
apache/pulsar/pull/17287, It does not go through the
`redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds)`
method, so need to directly judge again whether it is greater than: `if
(redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount())`
I originally thought the CPP client had no transactions, so it didn't need
to handle the scenario.
But I found that the user can call this method:
https://github.com/apache/pulsar-client-cpp/blob/2980bbe9d3ef47eef4a1d468c3898b5a989b4df0/include/pulsar/Consumer.h#L359
It also triggers the server to deliver messages directly without going
through the logic of sending them to DLQ.
I added back this judgment condition and added the test to cover it:
`DeadLetterQueueTest.testSendDLQTriggerByRedeliverUnacknowledgedMessages`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]