This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 04cec0fca79 [improve][pip] PIP-393: Improve performance of Negative
Acknowledgement (#23601)
04cec0fca79 is described below
commit 04cec0fca7930f0f800fef4119d7bd0de6097da6
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu Dec 5 16:55:38 2024 +0800
[improve][pip] PIP-393: Improve performance of Negative Acknowledgement
(#23601)
---
pip/pip-393.md | 226 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 226 insertions(+)
diff --git a/pip/pip-393.md b/pip/pip-393.md
new file mode 100644
index 00000000000..646c2beb5fe
--- /dev/null
+++ b/pip/pip-393.md
@@ -0,0 +1,226 @@
+
+# PIP-393: Improve performance of Negative Acknowledgement
+
+# Background knowledge
+
+Negative Acknowledgement is a feature in Pulsar that allows consumers to
trigger the redelivery
+of a message after some time when they fail to process it. When user calls
`negativeAcknowledge` method,
+`NegativeAcksTracker` in `ConsumerImpl` will add an entry into the map
`NegativeAcksTracker.nackedMessages`,
+mapping the message ID to the redelivery time. When the redelivery time comes,
`NegativeAcksTracker` will
+send a redelivery request to the broker to redeliver the message.
+
+# Motivation
+
+There are many issues with the current implementation of Negative
Acknowledgement in Pulsar:
+- the memory occupation is high.
+- the code execution efficiency is low.
+- the redelivery time is not accurate.
+- multiple negative ack for messages in the same entry(batch) will interfere
with each other.
+All of these problem is severe and need to be solved.
+
+## Memory occupation is high
+After the improvement of https://github.com/apache/pulsar/pull/23582, we have
reduce half more memory occupation
+of `NegativeAcksTracker` by replacing `HashMap` with
`ConcurrentLongLongPairHashMap`. With 1 million entry, the memory
+occupation decrease from 178MB to 64MB. With 10 million entry, the memory
occupation decrease from 1132MB to 512MB.
+The average memory occupation of each entry decrease from
1132MB/10000000=118byte to 512MB/10000000=53byte.
+
+But it is not enough. Assuming that we negative ack message 10k/s, assigning
1h redelivery delay for each message,
+the memory occupation of `NegativeAcksTracker` will be
`3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h,
+the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase
too fast.
+
+## Code execution efficiency is low
+Currently, each time the timer task is triggered, it will iterate all the
entries in `NegativeAcksTracker.nackedMessages`,
+which is unnecessary. We can sort entries by timestamp and only iterate the
entries that need to be redelivered.
+
+## Redelivery time is not accurate
+Currently, the redelivery check time is controlled by the
`timerIntervalNanos`, which is 1/3 of the `negativeAckRedeliveryDelay`.
+That means, if the `negativeAckRedeliveryDelay` is 1h, check task will be
started every 20min, the deviation of the redelivery
+time is 20min, which is unacceptable.
+
+## Multiple negative ack for messages in the same entry(batch) will interfere
with each other
+Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to
`timestamp`, which means multiple nacks from messages
+in the same batch share single one timestamp.
+If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later,
these two messages are delivered 20s later together.
+msg1 will not be redelivered 10s later as the timestamp recorded in
`NegativeAcksTracker#nackedMessages` is overrode by the second
+nack call.
+
+
+# Goals
+
+Refactor the `NegativeAcksTracker` to solve the above problems.
+
+To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we
use a sorted map to store the entries.
+To reduce memory occupation, we use util class provided by
fastutil(https://fastutil.di.unimi.it/docs/), and design
+a new algorithm to store the entries, reduce the memory occupation to even 1%
less than the current implementation.
+(the actual effect rely on the configuration and the throughput).
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### New Data Structure
+Use following data structure to store the entries:
+```java
+Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new
Long2ObjectAVLTreeMap<>();
+```
+mapping `timestamp -> ledgerId -> entryId`.
+We need to sort timestamp in ascending order, so we use a sorted map to map
timestamp to `ledgerId -> entryId` map.
+As there will many entries in the map, we use `Long2ObjectAVLTreeMap` instead
of `Long2ObjectRBTreeMap`.
+As for the inner map, we use `Long2ObjectMap` to map `ledgerId` to `entryId`
because we don't need to keep the order of `ledgerId`.
+`Long2ObjectOpenHashMap` will be satisfied.
+All entry id for the same ledger id will be stored in a bit set, as we only
care about the existence of the entry id.
+
+
+### TimeStamp Bucket
+Timestamp in ms is used as the key of the map. As most of the use cases don't
require that the precision of the delay time is 1ms,
+we can make the timestamp bucketed, that is, we can trim the lower bit of the
timestamp to map the timestamp to a bucket.
+For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000
and 0b1001 will be mapped to the same bucket 0b1000.
+Then all messages in the same bucket will be redelivered at the same time.
+If user can accept 1024ms deviation of the redelivery time, we can trim the
lower 10 bits of the timestamp, which can group a lot
+entries into the same bucket and reduce the memory occupation.
+
+following code snippet will be helpful to understand the design:
+```java
+ static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+```
+
+```java
+Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new
Long2ObjectAVLTreeMap<>();
+Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>();
+LongSet entrySet = new LongOpenHashSet();
+entrySet.add(entryId);
+ledgerMap.put(ledgerId, entrySet);
+map.put(timestamp, ledgerMap);
+```
+
+### Configuration
+
+Add a new configuration `negativeAckPrecisionBitCnt` to control the precision
of the redelivery time.
+```
+@ApiModelProperty(
+ name = "negativeAckPrecisionBitCnt",
+ value = "The redelivery time precision bit count. The lower bits
of the redelivery time will be\n" +
+ "trimmed to reduce the memory occupation. The default value is
8, which means the redelivery time\n" +
+ "will be bucketed by 256ms. In worst cases, the redelivery
time will be 512ms earlier(no later)\n" +
+ "than the expected time. If the value is 0, the redelivery
time will be accurate to ms.".
+ )
+ private long negativeAckPrecisionBitCnt = 8;
+```
+The higher the value, the more entries will be grouped into the same bucket,
the less memory occupation, the less accurate the redelivery time.
+Default value is 8, which means the redelivery time will be bucketed by 256ms.
In worst cases, the redelivery time will be 512ms earlier(no later)
+than the expected time.
+
+
+## Space complexity analysis
+### Space complexity of `ConcurrentLongLongPairHashMap`
+Before analyzing the new data structure, we need to know how much space it
take before this pip.
+
+We need to store 4 long field for `(ledgerId, entryId, partitionIndex,
timestamp)` for each entry, which takes `4*8=32byte`.
+As `ConcurrentLongLongPairHashMap` use open hash addressing and linear probe
to handle hash conflict, there are some
+redundant spaces to avoid high conflict rate. There are two configurations
that control how much redundant space to reserver:
+`fill factor` and `idle factor`. When the space utility rate soar high to
`fill factor`, the size of backing array will
+be double, when the space utility rate reduce to `idle factor`, the size of
backing array will reduce by half.
+
+The default value of `fill factor` is 0.66, `idle factor` is 0.15, which means
the min space occupation of
+`ConcurrentLongLongPairHashMap` is `32/0.66N byte = 48N byte`, the max space
occupation is `32/0.15N byte=213N byte`,
+where N is the number of entries.
+
+In the experiment showed in the PR, there are 1 million entries in the map,
taking up `32*1000000/1024/1024byte=30MB`,
+the space utility rate is 30/64=0.46, in the range of `[0.15, 0.66]`.
+
+
+### Space complexity of the new data structure
+The space used by new data structure is related to several factors: `message
rate`, `the time deviation user accepted`,
+`the max entries written in one ledger`.
+- Pulsar conf `managedLedgerMaxEntriesPerLedger=50000` determine the max
entries can be written into one ledger,
+we use the default value to analyze.
+- `the time deviation user accepted`: when user accept 1024ms delivery time
deviation, we can trim the lower 10 bit
+of the timestamp in ms, which can bucket 1024 timestamp.
+
+Following we will analyze the space used by one bucket, and calculate the
average space used by one entry.
+
+Assuming that the message rate is `x msg/ms`, and we trim `y bit` of the
timestamp, one bucket will contains `2**x` ms, and
+`M=2**x*y` msgs.
+- For one single bucket, we only need to store one timestamp, which takes
`8byte`.
+- Then, we need to store the ledgerId, when M is greater than
5w(`managedLedgerMaxEntriesPerLedger`), the ledger will switch.
+There are `L=ceil(M/50000)` ledgers, which take `8*L` byte.
+- Further, we analyze how much space the entry id takes. As there are
`L=ceil(M/50000)` ledgers, there will be `L` bitmap to store,
+which take `L*size(bitmap)`. The total space consumed by new data structure is
`8byte + 8L byte + L*size(bitmap)`.
+
+As the `size(bitmap)` is far more greater than `8byte`, we can ignore the
first two items. Then we get the formular of space
+consumed **one bucket**: `D=L*size(bitmap)=ceil(M/50000)*size(bitmap)`.
+
+Entry id is stored in a `Roaring64Bitmap`, for simplicity we can replace it
with `RoaringBitmap`, as the max entry id is 49999,
+which is smaller than `4294967296 (2 * Integer.MAX_VALUE)`(the max value can
be stored in `RoaringBitmap`). The space consume
+by `RoaringBitmap` depends on how many elements it contains, when the size of
bitmap < 4096, the space is `4N byte`,
+when the size of bitmap > 4096, the consumed space is a fixed value `8KB`.
+
+Then we get the final result:
+- when M>50000, `D = ceil(M/50000)*size(bitmap) ~= M/50000 * 8KB = M/50000 * 8
* 1024 byte = 0.163M byte`,
+each entry takes `0.163byte` by average.
+- when 4096<M<50000, `D = ceil(M/50000)*size(bitmap) = 1 * 8KB = 8KB`, each
entry takes `8*1024/M=8192/M byte` by average.
+- when M<4096, `D = ceil(M/50000)*size(bitmap) = 1 * 4M byte = 4M byte`, each
entry take `4 byte` by average.
+
+### Conclusion
+Assuming N is the number of entries, M is the number of messages in one bucket.
+- `ConcurrentLongLongPairHashMap`: `48N` byte in best case, `213N byte` in
worst case.
+- New data structure:
+ - when M>50000, `0.163N byte`.
+ - when 4096<M<50000, `8192/M * N byte` .
+ - when M<4096, `4N byte`.
+
+Some experiment results are showed in the PR, we can fine tune the
configuration to get the best performance.
+
+## Effect
+
+### Memory occupation is high
+With such kind of design, we can reduce the memory occupation of
`NegativeAcksTracker` to 1% less than the current implementation.
+
+### Code execution efficiency is low
+With the new design, we can avoid the iteration of all entries in
`NegativeAcksTracker.nackedMessages`, and only iterate the entries
+that need to be redelivered.
+
+### Redelivery time is not accurate
+With the new design, we avoid the fixed interval of the redelivery check time.
We can control the precision of the redelivery time
+by trimming the lower bits of the timestamp. If user can accept 1024ms
deviation of the redelivery time, we can trim the lower
+10 bits of the timestamp, which can group a lot
+
+### Multiple negative ack for messages in the same entry(batch) will interfere
with each other
+With the new design, if we let msg1 redelivered 10s later, then let msg2
redelivered 20s later, these two nacks will not interfere
+with each other, as they are stored in different buckets.
+
+
+## High-Level Design
+As this pip introduce new dependency `fastutil` into client, which is very
large(23MB), while few classes are used, we need to
+reduce the size of the dependency.
+
+Though there is alternative dependency `fastutil-core`, which is smaller(6MB),
but it is also
+relatively large and using `fastutil-core` will introduce another problem on
the broker side since there's already `fastutil` jar
+which also includes `fastutil-core` jar classes.
+
+The optimal solution would be to include only the classes from fastutil into
the shaded pulsar-client and pulsar-client-all
+which are really used and needed. This could be achieved in many ways. One
possible solution is to introduce an intermediate
+module for shaded pulsar-client and pulsar-client-all that isn't published to
maven central at all.
+It would be used to minimize and include only the classes from fastutil which
are required by pulsar-client shading.
+
+
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+User can upgrade to the new version without any compatibility issue.
+
+## Downgrade / Rollback
+
+User can downgrade to the old version without any compatibility issue.
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread:
https://lists.apache.org/thread/yojl7ylk7cyjxktq3cn8849hvmyv0fg8
+* Mailing List voting thread:
https://lists.apache.org/thread/hyc1r2s9chowdhck53lq07tznopt50dy