thetumbled commented on code in PR #23601:
URL: https://github.com/apache/pulsar/pull/23601#discussion_r1844717605


##########
pip/pip-393.md:
##########
@@ -0,0 +1,138 @@
+
+# PIP-393: Improve performance of Negative Acknowledgement
+
+# Background knowledge
+
+Negative Acknowledgement is a feature in Pulsar that allows consumers to 
trigger the redelivery 
+of a message after some time when they fail to process it. The redelivery 
delay is determined by
+the `negativeAckRedeliveryDelay` configuration.
+
+When user calls `negativeAcknowledge` method, `NegativeAcksTracker` in 
`ConsumerImpl` will add an entry
+into the map `NegativeAcksTracker.nackedMessages`, mapping the message ID to 
the redelivery time.
+When the redelivery time comes, `NegativeAcksTracker` will send a redelivery 
request to the broker to
+redeliver the message.
+
+# Motivation
+
+There are many issues with the current implementation of Negative 
Acknowledgement in Pulsar:
+- the memory occupation is high.
+- the code execution efficiency is low.
+- the redelivery time is not accurate.
+- multiple negative ack for messages in the same entry(batch) will interfere 
with each other.
+All of these problem is severe and need to be solved.
+
+## Memory occupation is high
+After the improvement of https://github.com/apache/pulsar/pull/23582, we have 
reduce half more memory occupation
+of `NegativeAcksTracker` by replacing `HashMap` with 
`ConcurrentLongLongPairHashMap`. With 100w entry, the memory
+occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation 
decrease from 1132Mb to 512Mb.
+The average memory occupation of each entry decrease from 
1132MB/10000000=118byte to 512MB/10000000=53byte.
+
+But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h 
redelivery delay for each message,
+the memory occupation of `NegativeAcksTracker` will be 
`3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h,
+the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase 
too fast.
+
+## Code execution efficiency is low
+Currently, each time the timer task is triggered, it will iterate all the 
entries in `NegativeAcksTracker.nackedMessages`, 
+which is unnecessary. We can sort entries by timestamp and only iterate the 
entries that need to be redelivered.
+
+## Redelivery time is not accurate
+Currently, the redelivery time is controlled by the `timerIntervalNanos`, 
which is 1/3 of the `negativeAckRedeliveryDelay`.
+That means, if the `negativeAckRedeliveryDelay` is 1h, the redelivery time 
will be 20min, which is unacceptable.
+
+## Multiple negative ack for messages in the same entry(batch) will interfere 
with each other
+Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to 
`timestamp`, which means multiple nacks from messages 
+in the same batch share single one timestamp. 
+If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, 
these two messages are delivered 20s later together.
+msg1 will not be redelivered 10s later as the timestamp recorded in 
`NegativeAcksTracker#nackedMessages` is overrode by the second
+nack call.
+
+
+# Goals
+
+Refactor the `NegativeAcksTracker` to solve the above problems.
+
+To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we 
use a sorted map to store the entries.

Review Comment:
   > I agree that the user doesn't need to know this, but there are technical 
limitations. The complete entry with all messages in the batch will get 
redelivered.
   
   It is ok that the complete entry with all messages in the batch will get 
redelivered. We should enable batch index ack when using negative ack to filter 
duplicate messages. 
   It is unacceptable that the whole messages of one batch restrict to be 
redellivered at the same time. As consumer can negative different messages with 
different redelivery time. **Batch should be transparent to user.**
   
   > I disagree. It doesn't make sense since each time the complete batch will 
get redelivered. The Pulsar client will filter out the already acknowledged 
messages from being processed again, however this filtering most likely has 
race conditions at least with message listeners.
   
   There is guarantee for at-least-one semanticis, not exactly-once semantics. 
If we can enable batch index ack to de-dup message, which has became the 
default configuration.
   
   > btw. If the nacks don't need to be de-duplicated, there wouldn't be any 
need to have a map data structure. You could simply add entries to a priority 
queue (or an optimized hashed wheel type of datastructure) each time a message 
is nacked.
   
   We can't just store the entries in client memory within a priority queue 
because there will a greate number of entries. This is the reason why we need 
negative ack, we just retain a message id with tens of bytes.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to