thetumbled commented on code in PR #23601:
URL: https://github.com/apache/pulsar/pull/23601#discussion_r1843605276


##########
pip/pip-393.md:
##########
@@ -0,0 +1,138 @@
+
+# PIP-393: Improve performance of Negative Acknowledgement
+
+# Background knowledge
+
+Negative Acknowledgement is a feature in Pulsar that allows consumers to 
trigger the redelivery 
+of a message after some time when they fail to process it. The redelivery 
delay is determined by
+the `negativeAckRedeliveryDelay` configuration.
+
+When user calls `negativeAcknowledge` method, `NegativeAcksTracker` in 
`ConsumerImpl` will add an entry
+into the map `NegativeAcksTracker.nackedMessages`, mapping the message ID to 
the redelivery time.
+When the redelivery time comes, `NegativeAcksTracker` will send a redelivery 
request to the broker to
+redeliver the message.
+
+# Motivation
+
+There are many issues with the current implementation of Negative 
Acknowledgement in Pulsar:
+- the memory occupation is high.
+- the code execution efficiency is low.
+- the redelivery time is not accurate.
+- multiple negative ack for messages in the same entry(batch) will interfere 
with each other.
+All of these problem is severe and need to be solved.
+
+## Memory occupation is high
+After the improvement of https://github.com/apache/pulsar/pull/23582, we have 
reduce half more memory occupation
+of `NegativeAcksTracker` by replacing `HashMap` with 
`ConcurrentLongLongPairHashMap`. With 100w entry, the memory
+occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation 
decrease from 1132Mb to 512Mb.
+The average memory occupation of each entry decrease from 
1132MB/10000000=118byte to 512MB/10000000=53byte.
+
+But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h 
redelivery delay for each message,
+the memory occupation of `NegativeAcksTracker` will be 
`3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h,
+the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase 
too fast.
+
+## Code execution efficiency is low
+Currently, each time the timer task is triggered, it will iterate all the 
entries in `NegativeAcksTracker.nackedMessages`, 
+which is unnecessary. We can sort entries by timestamp and only iterate the 
entries that need to be redelivered.
+
+## Redelivery time is not accurate
+Currently, the redelivery time is controlled by the `timerIntervalNanos`, 
which is 1/3 of the `negativeAckRedeliveryDelay`.
+That means, if the `negativeAckRedeliveryDelay` is 1h, the redelivery time 
will be 20min, which is unacceptable.
+
+## Multiple negative ack for messages in the same entry(batch) will interfere 
with each other
+Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to 
`timestamp`, which means multiple nacks from messages 
+in the same batch share single one timestamp. 
+If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, 
these two messages are delivered 20s later together.
+msg1 will not be redelivered 10s later as the timestamp recorded in 
`NegativeAcksTracker#nackedMessages` is overrode by the second
+nack call.
+
+
+# Goals
+
+Refactor the `NegativeAcksTracker` to solve the above problems.
+
+To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we 
use a sorted map to store the entries.

Review Comment:
   >  I'd try to avoid using a map completely.
   
   I don't know why should we avoid using map? The test result is good, i think 
it is good enough.
   
   >> multiple negative ack for messages in the same entry(batch) will 
interfere with each other.
   
   > The only reason why a nack would need to be updated seems to be related to 
batch messages?
   Since a full batch will always be redelivered, I wonder what the correct and 
expected behavior even is from the user's point of view?
   There's a possibility to de-duplicate the nacks for batch messages with a 
Long2ObjectMap<Roaring64Bitmap> to keep state of current positions that are 
already scheduled to be delivered. If another redelivery is requested for the 
same position, it could be simply ignored. Would that be sufficient?
   If that's not sufficient, I think it would be necessary to introduce a 
feature in NegativeAcksTracker to send nacks for batch message only after all 
batch message indices have been acknowledged or nacked. It seems that it would 
make more sense.
   For implementing this efficiently in the client, I think that 
BatchMessageIdImpl could be improved to avoid the need to use 
org.apache.pulsar.client.impl.MessageIdAdvUtils#discardBatch. Instead of 
creating new instances for the entry's message id, each BatchMessageIdImpl 
could hold a reference to a single BatchEntryMessageIdImpl (or whatever it's 
called) that would also hold the BitSet that is currently shared in 
BatchMessageIdImpl instances. When there's a shared instance for the message id 
for the entry of the batch messages, it could be used directly as a key in a 
map without causing more memory use. This change would reduce memory usage 
since the .discardBatch is used to create instances that are used in other 
trackers in the client at the moment.
   When a message id could be directly used as a key in a map, there would be 
any need for the current hacks that we just do to get better space efficiency.
   
   No, user actually don't need to know different messages come from the same 
batch. They need to be able assign different redelivery time for the different 
messages in the same batch.
   
   For example, one batch contains msg1, msg2, msg3, msg4. Consumer can nack 
msg1 with delay 10s, nack msg2 with 20s, nack msg3 with 30s, nack msg4 with 
20s. At such situation, we need to update the entry correponding to the 
timestamp for 20s, which is not the head of the queue, or the tail of the 
queue. 
   
   And we don't need to  de-duplicate the nacks from messages in the same 
batch, as they are not-related. Consumer don't know different messages are from 
the same batch, batch is a  way to improve the performance.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to