coderzc opened a new issue, #16763:
URL: https://github.com/apache/pulsar/issues/16763

   ## Motivation
   
   Scheduled and delayed message delivery is a widespread feature in messaging 
systems. Pulsar has supported delayed 
messages[[0]](https://pulsar.apache.org/docs/next/concepts-messaging#delayed-message-delivery)
 in 2.4.0, which uses an in-memory delayed message tracker to track all the 
delayed message indexes with a priority queue. The blog "Apache pulsar delay 
message delivery 
analysis"[[1]](https://developpaper.com/blog-recommendation-apache-pulsar-delay-message-delivery-analysis/)
 provided details of delayed messages and shared some challenges for the 
current implementation.
   
   ### The memory limitation of the priority queue
   A broker's memory is not infinite. For scenarios where users need to store 
many delayed messages, the in-memory priority queue might be a bottleneck for 
maintaining an extensive delayed index.
   
   Suppose you want to scale the delayed message capacity. In that case, you 
can add more partitions so that the delayed index can be distributed to 
multiple brokers, but this does not change the fact that a lot of broker memory 
is used.
   
   A topic might have several subscriptions, and the in-memory delayed indexes 
can't be used across the subscriptions; this also affects the broker's memory 
overhead.
   
   ### Expensive delayed index rebuilding
   To rebuild the delayed index, the broker needs to read all delayed messages 
of a topic. If there are too many delayed messages on a topic, the index 
rebuilding might take a long time, a few minutes to a few hours. As long as the 
subscription is under the delay index rebuilding situation, the consumers can't 
consume messages from the topic; this will bring more consumer unavailable time.
   
   This proposal focuses on the following two major problems of delayed message 
implementation.
   
   ## Goal
   
   * Support delayed message index snapshot to avoid the high index rebuild 
costs
   * Reduce the memory usage for maintaining the delayed message index
   
   ## Approach
   The solution is to introduce a new delayed message tracker which splits the 
whole delayed message index into multiple buckets based on the ledgers and an 
immutable snapshot for each bucket. Each bucket provides the ability to get the 
scheduled messages. Go through all the buckets to get scheduled messages on a 
topic.
   
   ### Share the delayed message index across subscriptions
   A topic can have several subscriptions. The current implementation is 
building the delayed message indexes for each subscription which will increase 
broker memory overhead and the overhead of replaying the log build index 
multiple times.
   
   ![Figure 1: share the delayed index across multiple 
subscriptions](https://user-images.githubusercontent.com/26179648/180347119-75d2f4e7-4092-481d-870b-90fe7cdbbd26.png)
   
   Instead, we can use a separate cursor to build the shared delayed message 
indexes. So that all the subscriptions under the topic can reuse the same 
delayed message indexes.
   
   Any subscription that triggers the delayed message checking will poll the 
message IDs from the delayed message tracker. But unlike the current 
implementation, the scheduled message IDs need to add to the replay queue of 
all subscriptions. The dispatcher of the subscription will take care of the 
newly added message IDs and perform the message delivery.
   
   The subscriptions have different mark delete positions. If the scheduled 
messages are before the mark delete position, the cursor read operation will 
filter out them.
   
   A risk here is the scheduled messages will remove from the delayed message 
tracker. If the broker crashes before delivering the scheduled messages to 
consumers, the messages will not add back to the delayed tracker again. So the 
broker will not redeliver the messages to consumers. It’s related to the 
delayed message filtering and will introduce in the following sections.
   
   ### Delayed message index bucket
   The delayed message index bucket contains the indexes of a couple of 
Ledgers. Each bucket mainly includes two parts, the Bitset for each Ledger for 
checking if a message ID is a delayed message (contained by the delayed message 
index) and the priority queue for getting the scheduled messages.
   
   ![Figure 2: Delayed message index 
bucket](https://user-images.githubusercontent.com/26179648/180347425-641db81a-a3e2-4d59-8f9f-95aa381527a5.png)
   
   A topic can have multiple delayed message index buckets, and the maximum 
number of buckets is configurable. The delayed message tracker will load the 
first segment(This part will be introduced later, one segment will map to an 
entry of the bucket snapshot) of each bucket to a shared priority queue. To get 
the topic’s scheduled messages by poll messages from the shared priority queue. 
After all the messages of a bucket segment have been processed, load the next 
segment of this bucket.
   
   The delayed message tracker contains a special bucket (LastMutableBucket), 
it records the current last ledger range delayed message index by using an 
extra priority queue(last mutable delayed message priority queue) in fact, that 
priority queue reused the previous queue of `InMemoryDelayedDeliveryTracker`. 
When the tracker receives a message ID of `ledgerId > 
LastMutableBucket.endLegerId` tracker will create an immutable bucket and clear 
`LastMutableDelayedMessagePriorityQueue`. The delayed message tracker will move 
scheduled messages from `LastMutableDelayedMessagePriorityQueue` to the shared 
delayed message queue when regular task is triggered or poll the message IDs 
from the delayed message tracker.
   
   The dispatcher of a subscription reads messages by the cursor to dispatch 
messages to consumers. For the delayed messages, the cursor needs to filter out 
them based on the delayed message index bucket. For example, if we have ten 
messages [0, 9], messages [1,8] are delayed. The cursor should only read 
messages 0 and 9 from the bookies. Note that the current implementation reads 
all ten messages and filters [1,8] out in the broker, which we need to improve.
   
   So if the messages are not in the delayed message tracker and reach the 
scheduled time, the broker can dispatch the messages to the consumer directly. 
If the messages are not in the delayed tracker but do not reach the scheduled 
time, the subscription just needs to skip them because they will be added back 
to the delayed message tracker.
   
   ### Delayed message index bucket snapshot
   The bucket snapshot can reduce the costs(replay all the original messages) 
of rebuilding the delayed message index. We can use a Ledger to store the 
bucket snapshot data and maintain the bucket snapshot list by the cursor(build 
delayed message index cursor) properties. We can know how many delayed index 
buckets the topic has and read the snapshot from the persistent Ledger.
   
   ![Figure 3: Delayed message index bucket 
snapshot](https://user-images.githubusercontent.com/26179648/180347492-e77a9a2f-c022-4800-ad4f-d4ce45019be8.png)
   
   The delayed index bucket snapshot data write to multiple entries according 
to the scheduled time. We can only load the first valid entry in the memory. 
After all the delayed messages are scheduled, load the delayed messages from 
the next entry. Here we will not make any changes to the snapshot data. 
   
   The delayed index bucket snapshot data will be stored starting from Entry1, 
because Entry0 recorded the metadata for snapshots, then introduce that 
metadata.
   
   The maxScheduleTimestamps is used to find the first snapshot entry(which has 
messages not reach the scheduled time). The bucket will skip the snapshot entry 
if all the messages in the snapshot entry reach the scheduled time when 
rebuilding the delayed message index (because the broker can dispatch the 
messages to the consumer directly).
   
   The delayedMessageMaps is used to check if the message ID is present in the 
bucket or not. It records all BitSet pair key for the delay message indexes per 
snapshot. When load a snapshot in the memory, the delayed message tracker will 
merge the BitSet pair from the current snapshot to the last snapshot.
   
   ### Merge message index buckets
   We can configure the max buckets of a topic. If the number of buckets 
reaches the max buckets limitation, the newly added Ledger into the buckets 
will trigger the buckets merging. The delayed message tracker will merge two 
adjacent buckets with the least delayed messages.
   
   ### Delete message index bucket
   After merging buckets, the delayed message tracker will delete the old 
bucket. 
   Also, when all the delayed messages of all snapshots in the bucket are 
scheduled then tracker will delete that bucket.
   
   
   ## Config Changes
   
   borker.conf
   ```
   # The delayed message index bucket ledgerId range step
   delayedDeliveryLedgerStepPerBucket=5
   
   # The delayed message index bucket time step in per snapshot
   delayedDeliveryTimeStepPerBucketSnapshotSeconds=300
   
   # The max number of delayed message index bucket
   delayedDeliveryMaxNumBuckets=20
   ```
   
   ## Implementation
   
   * Add a new Prototbuf for bucket snapshot
   BucketSnapshotFormat.proto
   ```protobuf
   message BucketMessage {
      required uint64 timestamp = 1;
      required int64 ledger_id = 2;
      required int64 entry_id = 3;
   }
   
   message DelayMessageBitMap {
      map<uint64, bytes> bit_map = 1;
   }
   
   message BucketSnapshot {
      repeated DelayMessageBitMap delay_message_bit_maps = 1;
      repeated uint64 max_schedule_timestamps = 2;
      repeated BucketMessage messages = 3;
   }
   ```
   
   * Add a interface `BucketSnapshotStorage` to store BucketSnapshot
   ```java
   public interface BucketSnapshotStorage {
       CompletableFuture<Long> createBucket();
   
        CompletableFuture<Void> saveBucketAllSnapshots(Long bucketId, 
List<BucketSnapshotFormat.BucketSnapshot> bucketSnapshots);
   
       CompletableFuture<BucketSnapshotFormat.BucketSnapshot> 
getBucketSnapshot(Long bucketId, Long snapshotEntryId);
   
       CompletableFuture<Void> deleteBucket(Long bucketId);
   
       void start() throws Exception;
   
       void close() throws Exception;
   }
   ```
   
   * Add a new delayed message tracker BucketDelayedDeliveryTracker and extends 
from InMemoryDelayedDeliveryTracker.
   
   * Add a method `existDelayedMessage` in the `DelayedDeliveryTracker` 
interface to filter out the delayed messages.
   ```java
   public interface DelayedDeliveryTracker {
   //......
   boolean existDelayedMessage(long ledgerId, long entryId);
   }
   ```
   * The cursor will filter out all delayed messages based on the 
`existDelayedMessage` and skip them when reading messages from bookies. The 
change will include make cursor and ManagedLedger support discontinuous read 
entries. (the change seems to be relatively large).
   
   * Use `existDelayedMessage` to avoid tracker record repeated message index, 
when add a message to the delayed message tracker.
   
   * Use a separate cursor to build the delayed message tracker and add the 
scheduled message to the replay queue of all subscriptions when any 
subscription that triggers the delayed message checking.
   
   ## References
   [0] [delayed message 
delivery](https://pulsar.apache.org/docs/next/concepts-messaging#delayed-message-delivery)
 
   [1] [Apache pulsar delay message delivery 
analysis](https://developpaper.com/blog-recommendation-apache-pulsar-delay-message-delivery-analysis/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to