This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04cec0fca79 [improve][pip] PIP-393: Improve performance of Negative 
Acknowledgement (#23601)
04cec0fca79 is described below

commit 04cec0fca7930f0f800fef4119d7bd0de6097da6
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu Dec 5 16:55:38 2024 +0800

    [improve][pip] PIP-393: Improve performance of Negative Acknowledgement 
(#23601)
---
 pip/pip-393.md | 226 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 226 insertions(+)

diff --git a/pip/pip-393.md b/pip/pip-393.md
new file mode 100644
index 00000000000..646c2beb5fe
--- /dev/null
+++ b/pip/pip-393.md
@@ -0,0 +1,226 @@
+
+# PIP-393: Improve performance of Negative Acknowledgement
+
+# Background knowledge
+
+Negative Acknowledgement is a feature in Pulsar that allows consumers to 
trigger the redelivery 
+of a message after some time when they fail to process it. When user calls 
`negativeAcknowledge` method,
+`NegativeAcksTracker` in `ConsumerImpl` will add an entry into the map 
`NegativeAcksTracker.nackedMessages`,
+mapping the message ID to the redelivery time. When the redelivery time comes, 
`NegativeAcksTracker` will 
+send a redelivery request to the broker to redeliver the message.
+
+# Motivation
+
+There are many issues with the current implementation of Negative 
Acknowledgement in Pulsar:
+- the memory occupation is high.
+- the code execution efficiency is low.
+- the redelivery time is not accurate.
+- multiple negative ack for messages in the same entry(batch) will interfere 
with each other.
+All of these problem is severe and need to be solved.
+
+## Memory occupation is high
+After the improvement of https://github.com/apache/pulsar/pull/23582, we have 
reduce half more memory occupation
+of `NegativeAcksTracker` by replacing `HashMap` with 
`ConcurrentLongLongPairHashMap`. With 1 million entry, the memory
+occupation decrease from 178MB to 64MB. With 10 million entry, the memory 
occupation decrease from 1132MB to 512MB.
+The average memory occupation of each entry decrease from 
1132MB/10000000=118byte to 512MB/10000000=53byte.
+
+But it is not enough. Assuming that we negative ack message 10k/s, assigning 
1h redelivery delay for each message,
+the memory occupation of `NegativeAcksTracker` will be 
`3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h,
+the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase 
too fast.
+
+## Code execution efficiency is low
+Currently, each time the timer task is triggered, it will iterate all the 
entries in `NegativeAcksTracker.nackedMessages`, 
+which is unnecessary. We can sort entries by timestamp and only iterate the 
entries that need to be redelivered.
+
+## Redelivery time is not accurate
+Currently, the redelivery check time is controlled by the 
`timerIntervalNanos`, which is 1/3 of the `negativeAckRedeliveryDelay`.
+That means, if the `negativeAckRedeliveryDelay` is 1h, check task will be 
started every 20min, the deviation of the redelivery 
+time is 20min, which is unacceptable.
+
+## Multiple negative ack for messages in the same entry(batch) will interfere 
with each other
+Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to 
`timestamp`, which means multiple nacks from messages 
+in the same batch share single one timestamp. 
+If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, 
these two messages are delivered 20s later together.
+msg1 will not be redelivered 10s later as the timestamp recorded in 
`NegativeAcksTracker#nackedMessages` is overrode by the second
+nack call.
+
+
+# Goals
+
+Refactor the `NegativeAcksTracker` to solve the above problems.
+
+To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we 
use a sorted map to store the entries.
+To reduce memory occupation, we use util class provided by 
fastutil(https://fastutil.di.unimi.it/docs/), and design 
+a new algorithm to store the entries, reduce the memory occupation to even 1% 
less than the current implementation.
+(the actual effect rely on the configuration and the throughput).
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### New Data Structure
+Use following data structure to store the entries:
+```java
+Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new 
Long2ObjectAVLTreeMap<>();
+```
+mapping `timestamp -> ledgerId -> entryId`.
+We need to sort timestamp in ascending order, so we use a sorted map to map 
timestamp to `ledgerId -> entryId` map.
+As there will many entries in the map, we use `Long2ObjectAVLTreeMap` instead 
of `Long2ObjectRBTreeMap`.
+As for the inner map, we use `Long2ObjectMap` to map `ledgerId` to `entryId` 
because we don't need to keep the order of `ledgerId`.
+`Long2ObjectOpenHashMap` will be satisfied.
+All entry id for the same ledger id will be stored in a bit set, as we only 
care about the existence of the entry id.
+
+
+### TimeStamp Bucket
+Timestamp in ms is used as the key of the map. As most of the use cases don't 
require that the precision of the delay time is 1ms,
+we can make the timestamp bucketed, that is, we can trim the lower bit of the 
timestamp to map the timestamp to a bucket.
+For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000 
and 0b1001 will be mapped to the same bucket 0b1000.
+Then all messages in the same bucket will be redelivered at the same time.
+If user can accept 1024ms deviation of the redelivery time, we can trim the 
lower 10 bits of the timestamp, which can group a lot
+entries into the same bucket and reduce the memory occupation.
+
+following code snippet will be helpful to understand the design:
+```java
+    static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+```
+
+```java
+Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new 
Long2ObjectAVLTreeMap<>();
+Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>();
+LongSet entrySet = new LongOpenHashSet();
+entrySet.add(entryId);
+ledgerMap.put(ledgerId, entrySet);
+map.put(timestamp, ledgerMap);
+```
+
+### Configuration
+
+Add a new configuration `negativeAckPrecisionBitCnt` to control the precision 
of the redelivery time.
+```
+@ApiModelProperty(
+            name = "negativeAckPrecisionBitCnt",
+            value = "The redelivery time precision bit count. The lower bits 
of the redelivery time will be\n" + 
+                "trimmed to reduce the memory occupation. The default value is 
8, which means the redelivery time\n" +
+                "will be bucketed by 256ms. In worst cases, the redelivery 
time will be 512ms earlier(no later)\n" +
+                "than the expected time. If the value is 0, the redelivery 
time will be accurate to ms.".
+    )
+    private long negativeAckPrecisionBitCnt = 8;
+```
+The higher the value, the more entries will be grouped into the same bucket, 
the less memory occupation, the less accurate the redelivery time.
+Default value is 8, which means the redelivery time will be bucketed by 256ms. 
In worst cases, the redelivery time will be 512ms earlier(no later)
+than the expected time.
+
+
+## Space complexity analysis
+### Space complexity of `ConcurrentLongLongPairHashMap`
+Before analyzing the new data structure, we need to know how much space it 
take before this pip.
+
+We need to store 4 long field for `(ledgerId, entryId, partitionIndex, 
timestamp)` for each entry, which takes `4*8=32byte`.
+As `ConcurrentLongLongPairHashMap` use open hash addressing and linear probe 
to handle hash conflict, there are some 
+redundant spaces to avoid high conflict rate. There are two configurations 
that control how much redundant space to reserver: 
+`fill factor` and `idle factor`. When the space utility rate soar high to 
`fill factor`, the size of backing array will
+be double, when the space utility rate reduce to `idle factor`,  the size of 
backing array will reduce by half.
+
+The default value of `fill factor` is 0.66, `idle factor` is 0.15, which means 
the min space occupation of
+`ConcurrentLongLongPairHashMap` is `32/0.66N byte = 48N byte`, the max space 
occupation is `32/0.15N byte=213N byte`, 
+where N is the number of entries.
+
+In the experiment showed in the PR, there are 1 million entries in the map, 
taking up `32*1000000/1024/1024byte=30MB`,
+the space utility rate is 30/64=0.46, in the range of `[0.15, 0.66]`.
+
+
+### Space complexity of the new data structure
+The space used by new data structure is related to several factors: `message 
rate`, `the time deviation user accepted`,
+`the max entries written in one ledger`.
+- Pulsar conf `managedLedgerMaxEntriesPerLedger=50000` determine the max 
entries can be written into one ledger,
+we use the default value to analyze.
+- `the time deviation user accepted`: when user accept 1024ms delivery time 
deviation, we can trim the lower 10 bit
+of the timestamp in ms, which can bucket 1024 timestamp.
+
+Following we will analyze the space used by one bucket, and calculate the 
average space used by one entry.
+
+Assuming that the message rate is `x msg/ms`, and we trim `y bit` of the 
timestamp, one bucket will contains `2**x` ms, and
+`M=2**x*y` msgs.
+- For one single bucket, we only need to store one timestamp, which takes 
`8byte`.
+- Then, we need to store the ledgerId, when M is greater than 
5w(`managedLedgerMaxEntriesPerLedger`), the ledger will switch.
+There are `L=ceil(M/50000)` ledgers, which take `8*L` byte.
+- Further, we analyze how much space the entry id takes. As there are 
`L=ceil(M/50000)` ledgers, there will be `L` bitmap to store,
+which take `L*size(bitmap)`. The total space consumed by new data structure is 
`8byte + 8L byte + L*size(bitmap)`.
+
+As the `size(bitmap)` is far more greater than `8byte`, we can ignore the 
first two items. Then we get the formular of space 
+consumed **one bucket**: `D=L*size(bitmap)=ceil(M/50000)*size(bitmap)`.
+
+Entry id is stored in a `Roaring64Bitmap`, for simplicity we can replace it 
with `RoaringBitmap`, as the max entry id is 49999,
+which is smaller than `4294967296 (2 * Integer.MAX_VALUE)`(the max value can 
be stored in `RoaringBitmap`). The space consume 
+by `RoaringBitmap` depends on how many elements it contains, when the size of 
bitmap < 4096, the space is `4N byte`,
+when the size of bitmap > 4096, the consumed space is a fixed value `8KB`.
+
+Then we get the final result:
+- when M>50000, `D = ceil(M/50000)*size(bitmap) ~= M/50000 * 8KB = M/50000 * 8 
* 1024 byte = 0.163M byte`, 
+each entry takes `0.163byte` by average.
+- when 4096<M<50000, `D = ceil(M/50000)*size(bitmap) = 1 * 8KB = 8KB`, each 
entry takes `8*1024/M=8192/M byte` by average.
+- when M<4096, `D = ceil(M/50000)*size(bitmap) = 1 * 4M byte = 4M byte`, each 
entry take `4 byte` by average.
+
+### Conclusion
+Assuming N is the number of entries, M is the number of messages in one bucket.
+- `ConcurrentLongLongPairHashMap`: `48N` byte in best case, `213N byte` in 
worst case.
+- New data structure:
+    - when M>50000, `0.163N byte`.
+    - when 4096<M<50000, `8192/M * N byte` .
+    - when M<4096, `4N byte`.
+
+Some experiment results are showed in the PR, we can fine tune the 
configuration to get the best performance.
+
+## Effect
+
+### Memory occupation is high
+With such kind of design, we can reduce the memory occupation of 
`NegativeAcksTracker` to 1% less than the current implementation.
+
+### Code execution efficiency is low
+With the new design, we can avoid the iteration of all entries in 
`NegativeAcksTracker.nackedMessages`, and only iterate the entries
+that need to be redelivered.
+
+### Redelivery time is not accurate
+With the new design, we avoid the fixed interval of the redelivery check time. 
We can control the precision of the redelivery time 
+by trimming the lower bits of the timestamp. If user can accept 1024ms 
deviation of the redelivery time, we can trim the lower
+10 bits of the timestamp, which can group a lot
+
+### Multiple negative ack for messages in the same entry(batch) will interfere 
with each other
+With the new design, if we let msg1 redelivered 10s later, then let msg2 
redelivered 20s later, these two nacks will not interfere
+with each other, as they are stored in different buckets.
+
+
+## High-Level Design
+As this pip introduce new dependency `fastutil` into client, which is very 
large(23MB), while few classes are used, we need to
+reduce the size of the dependency. 
+
+Though there is alternative dependency `fastutil-core`, which is smaller(6MB), 
but it is also
+relatively large and using `fastutil-core` will introduce another problem on 
the broker side since there's already `fastutil` jar 
+which also includes `fastutil-core` jar classes.
+
+The optimal solution would be to include only the classes from fastutil into 
the shaded pulsar-client and pulsar-client-all 
+which are really used and needed. This could be achieved in many ways. One 
possible solution is to introduce an intermediate
+module for shaded pulsar-client and pulsar-client-all that isn't published to 
maven central at all. 
+It would be used to minimize and include only the classes from fastutil which 
are required by pulsar-client shading.
+
+
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+User can upgrade to the new version without any compatibility issue.
+
+## Downgrade / Rollback
+
+User can downgrade to the old version without any compatibility issue.
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread: 
https://lists.apache.org/thread/yojl7ylk7cyjxktq3cn8849hvmyv0fg8
+* Mailing List voting thread: 
https://lists.apache.org/thread/hyc1r2s9chowdhck53lq07tznopt50dy

Reply via email to