JavaXiaoJun opened a new issue, #15500:
URL: https://github.com/apache/pulsar/issues/15500

   # Motivation
   
   ### Business scene
   Delayed messages are a common scenario in the Message business system. For 
example, group-opening reminders in group buying activities, automatic 
deductions when continuous monthly subscriptions expire, and coupon expiration 
reminders can all be implemented with delayed messages.
   
   ### Current Mode
   Currently Pulsar implements arbitrarily delayed messages based on in-memory 
time rounds. The Producer side can send a mixture of ordinary messages and 
delayed messages to topics and persist them to Bookkeeper. Currently, delayed 
messages can only support shard mode subscriptions. Each subscription group 
will identify delayed messages and add them to the time wheel. The default time 
granularity of the time wheel is 1 second. The time wheel stores the delayed 
timestamp, ledgerId and EntryId related index information. When the time 
arrives, the message will be read and sent to the Consumer according to the 
ledgerId and EntryId.
   
   ### Current Problems
   1. The size of the memory is limited. A message needs 24B to store 3 long 
fields. Assuming that on a Broker, 10 million delayed messages require 228M of 
off-heap memory.
   2. The deletion cycle of Ledger is longer. When delayed messages and 
ordinary messages are stored in the same Ledger, if they are stored in a 
delayed message with a large time span, it will affect the deletion of the 
Ledger, because the messages that are not consumed will not be cleared, and the 
memory will be occupied for a long time, such as after 1 month. It's time to 
spend.
   3. Delayed message recovery scenarios are complicated. If a topic has a 
large number of delayed messages, when the topic is transferred or the broker 
goes down, a large number of delayed messages need to be re-indexed.
   
   
   # Goat
   
   ### Optimization ideas
   Reduce the delay message magnitude for the current build time round. We can 
use the delay class to divide the overall delayed messages into time ranges, so 
that we only need to care about the most recent delayed messages. For example, 
a message with a relatively large time span is stored in a common topic (delay 
level topic), and then a certain mechanism is used to ensure that the message 
that is about to expire is placed in the time wheel over time, so that to a 
certain extent, the message can be greatly improved. Reduce the magnitude of 
delayed messages on the time wheel, thereby mitigating the impact of the 
problems described above.
   
   
![image](https://user-images.githubusercontent.com/31603070/167359401-59e6108d-b68d-44e2-9592-97f028037e05.png)
   
   ### Implementation details
   1. When the Producer sends a message, it will calculate which partition of 
the delay level topic the message needs to be sent to based on the delay time, 
where the delay level topic is an internal partition topic. For example, if the 
delay span of a business is 1 year, you can create a 364-day partition Topic. 
Partion-0 stores delayed messages with time >= 1 day and < 2 days, and 
partion-363 stores time >= 364 days and < Delayed messages for 365 days, if the 
delay time is less than 1 day, will be sent directly to the real business 
topic, and the time wheel will run normally. For the partition division rule of 
the delay level topic, we can determine it according to the policy and the 
maximum delay time passed in from the Producer.
   2. A set of timing tasks are required to synchronize the messages in the 
delay level topic to the business topic. The synchronization process can be 
understood as a complete consumption and production process, and there will be 
message persistence in this process. Usually, this capability can be maintained 
by the Broker. In order to prevent the Broker from being overloaded, the 
ability of Functions can be used to achieve delayed message synchronization.
   
   ### Questions
   1. The delay level topic is automatically created by the client, and the 
data retention mechanism needs to be considered.
   2. Delay level Topic life cycle management. When the main service topic is 
deleted, does the delay level topic need to be deleted synchronously? How to 
deal with delayed messages that have not been delivered at this time?
   3. The user specifies an unreasonable policy and maximum delay time on the 
client side, which may lead to the existence of too many partitions in the 
delay level topic.
   4. Is there a scenario of dynamically expanding the delay level of the 
Topic's partition?
   
   # Proposed Changes
   
   ### Client API
   
   - add fields `#delayLevelTopicEnabled`
   `#delayLevelTopicName` `#partitionStrategy` in 
`ProducerConfigurationData.java`
   
   ````java
   
       //for delay level topic
       private boolean delayLevelTopicEnabled = false;
   
       private String delayLevelTopicName = null;
   
       private DelayLevelTopicPartitionStrategy partitionStrategy = new 
DelayLevelTopicPartitionStrategy();
   
   ````
   
   - add class `DelayLevelTopicPartitionStrategy.java`
   
   ````java
   
   public class DelayLevelTopicPartitionStrategy {
   
       private long messageMaximumDelayTimeSeconds = 60 * 60 * 24 * 30;
   
       private PartitionBy partitionBy = PartitionBy.DAY;
   
       public enum PartitionBy {
           YEAR,MONTH,DAY,HOUR
       }
   }
   
   ````
   
   - add methods `#enableDelayLevelTopic(boolean delayLevelTopicEnabled)` 
`#delayLevelTopic(String delayLevelTopicName)` 
`#delayLevelTopicPartitionStrategy(DelayLevelTopicPartitionStrategy strategy)` 
in `ProducerBuilder.java`
   
   ````java
   
       /**
        * This config determines whether to delay the level of topic when 
delaying message delivery.
        * Turn on this option to get better performance in scenarios with a 
large number of delayed messages and long delays.
        * Not turned on by default.
        *
        * @param delayLevelTopicEnabled
        * @return the producer builder instance
        */
       ProducerBuilder<T> enableDelayLevelTopic(boolean delayLevelTopicEnabled);
   
       /**
        * The name of the delay level topic can be specified.
        * If not specified, it will be generated based on the current topic 
name.
        *
        * @param delayLevelTopicName the name of the delay level topic
        * @return the producer builder instance
        */
       ProducerBuilder<T> delayLevelTopic(String delayLevelTopicName);
   
       /**
        * Partition strategy for delay level topic.
        * We will decide how many partitions this topic will be divided into 
according to the
        * messageMaximumDelayTimeSeconds and partitionBy.
        *
        * @param strategy the strategy of delay level topic partitioned by
        * @return the producer builder instance
        */
       ProducerBuilder<T> 
delayLevelTopicPartitionStrategy(DelayLevelTopicPartitionStrategy strategy);
   
   ````
   - Specify these configurations when creating the Producer
   
   ````java
       ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
       builder.enableDelayLevelTopic(true).
               
delayLevelTopic("persistent://public/default/delay-level-topic-test").
               delayLevelTopicPartitionStrategy(new 
DelayLevelTopicPartitionStrategy(60 * 60 * 24 * 30, DelayLevelTopicPartitionSt
   rategy.PartitionBy.DAY));
   ````
   ### Broker
   TODO 
   
   ### Functions
   TODO : Timing synchronization level delay message in topic to business topic


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to