codelipenghui commented on a change in pull request #12566:
URL: https://github.com/apache/pulsar/pull/12566#discussion_r741569181
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
##########
@@ -95,6 +105,40 @@ public synchronized void add(MessageId messageId) {
}
}
+ public synchronized void add(Message<?> message) {
+ if (negativeAckRedeliveryBackoff == null) {
+ add(message.getMessageId());
+ return;
+ }
+ add(message.getMessageId(), message.getRedeliveryCount());
+ }
+
+ private synchronized void add(MessageId messageId, int redeliveryCount) {
+ if (messageId instanceof TopicMessageIdImpl) {
Review comment:
The `TopicMessageIdImpl` will not reach here? The NegativeAcksTracker
maintained in the internal consumers
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
##########
@@ -749,4 +749,10 @@
* Default: null
*/
ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor
payloadProcessor);
+
+ /**
+ * Notice: the negativeAckRedeliveryBackoff will not work with
`consumer.negativeAcknowledge(MessageId messageId)`
+ * because we are not able to get the redelivery count from the message ID.
+ */
Review comment:
It's better to provide an example for how to enable the redelivery
backoff such as using the built-in exponential backoff
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]