poorbarcode opened a new issue, #19966:
URL: https://github.com/apache/pulsar/issues/19966

   ### Motivation
   
   Differ from Kafka; Pulsar supports [non-sequential acknowledgment 
request](https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#acknowledgment)
 (just like ack `{pos-1, pos-3, pos-5}`), so instead of a pointer(acknowledged 
on the left and un-acknowledged on the right), Pulsar needs to persist the 
acknowledgment state of each message, we call these records 
`Individual_Deleted_Messages.`
   
   Since the frequent persistence of Individual_Deleted_Messages will magnify 
the amount of BK-Written and increases the latency of ack-response, Broker does 
not immediately persist it when receiving a consumer's acknowledgment but 
persists it regularly. This would cause repeated consumption if the broker lost 
unpersisted Individual_Deleted_Messages in memory by Broker crash(which is an 
intended behavior).
   
   The current persistence mechanism of Individual_Deleted_Messages works like 
this:
   1. write an Entry(the data of Individual_Deleted_Messages) to BK; by 
default, the maximum size of the Entry is 5MB.
   2. write the data if Individual_Deleted_Messages to ZK if BK-Write fails; 
data of a ZK node is less than 10MB is recommended.
   
   Therefore, the data of Individual_Deleted_Messages is recommended to be less 
than 5MB; if a subscription's Individual_Deleted_Messages data is too large to 
persist, as the program grows for a long time, there will be more and more 
non-persistent data. Eventually, there will be an unacceptable amount of 
repeated consumption of messages when the Broker restarts.
   
   Is 5MB enough? Individual_Deleted_Messages consists of Position_Rang(each 
Position_Rang occupies 32 bytes, the implementation will not explain in this 
proposal), this means that the Broker can persist `32MB / 32bytes` number of 
Position_Rang for each Subscription, and there is an additional compression 
mechanism at work, so it is sufficient for almost all scenarios except the 
following three:
   1. Clients receive many messages, and ack some of them, the rest still need 
to be answered due to errors or other reasons. As time goes on, more and more 
records will be staying there. We call this scenario Client-Scenario.
   2. If the number of consumers is large and each has some discrete ack 
records, all add up to a large number. We're going to classify it as 
Client-Scenario too.
   3. Long-delayed and short-delayed messages are mixed together, with only the 
short-delayed message successfully consumed and the long-delayed message not 
delivered. As time goes on, more and more records will be staying there. We 
call this scenario Delay-Message-Scenario.
   4. Due to heavy traffic, a large number of discrete ack records exist in the 
memory, but these records will become continuous records in a short time. For 
example, there are a lot of ack records like this `[1,3,5.....1000001]`, and 
the records `[2,4,5.....1000000]` will be acked soon. We call this scenario 
Heavy-Traffic-Scenario.
   
   ### Goal
   
   This proposal is designed to resolve the problem of repeated consumption of 
large numbers of messages after the broker restarts in scenarios 
Client-Scenario and Heavy-Traffic-Scenario. As for Delay-Message-Scenario, it 
will be solved by the design of the Delay-Message, which is not within the 
scope of this proposal.
   
   #### For Client-Scenario
   Provide a mechanism to make the Dispatcher block dispatching once un-persist 
ack records reach the max limit, we call this mechanism Max Un-Persist Ack 
Limitation and provide a config to set the max bytes size of un-persist ack 
records, we call this config `maxUnPersistAckRecordsBytes,` it will work like 
[maxUnackedMessagesPerSubscription](https://github.com/apache/pulsar/pull/399)).
   
   #### For Heavy-Traffic-Scenario
   Provide a mechanism for selective persistence of ack records: just persist 
the ack records which stay in memory for a long time, do not persist if it is 
new. This sounds like the JVM dividing objects into the young generation and 
the old generation. We call this mechanism Generational Ack Persistent.
   For example:
   - there have some ack records `[1,3,5,101,103]`;
     - the records`[1,3,5]` is old and we do not know when `[2,4]` will be 
acknowledged;
     - the records`[101,103,105.....100001]` are young, and the records 
`[102,104,106.....100000]` will be acknowledged soon.
   - The persistence the first time: just persist the records `[1,3,5]`
   - The persistence the second time: after `[102,104,106.....100000]` are 
labeled as the old data, the records `[102,104,106.....100000]` have already 
been acknowleged, then we can compress them to `[101-100001]`, which could save 
a lot of space.
   We will provide a config to set how long seconds it takes for an ack record 
to become old data, we call this config `becomeOldAckRecordSeconds.`
   
   Both mechanisms can be turned on at the same time for better results.
   
   ### API Changes
   
   ##### broker.conf
   ```java
   /** 
    * Max bytes size of un-persist ack records, the Dispatcher will block 
dispatching if it hits this limitation, default value is 5M.
    * If the value is less than 1, the feature is turned off.  And this config 
is limited, if it is not less than 1, it should be larger or equal to 1K.
    */
   long maxUnPersistAckRecordsBytes;
   ```
   
   ##### Policies of topic
   Since not all topics have huge traffic, we do not make the mechanism 
Generational Ack Persistent as a global feature and only apply it to specific 
topics, so just add this config in the Topic Policy.
   
   ```java
   /** 
     * How long seconds it takes for an ack record to become old data?
     * If this value is large than 0, the subscriptions will just persist old 
ack records.
     */
   int becomeOldAckRecordSeconds;
   ```
   
   ### Implementation
   
   #### For Client-Scenario
   We don't calculate the size of the ack records in real-time, we just mark 
the value when ack data is persisting, so the size of ack data is an estimate.
   
   #### For Heavy-Traffic-Scenario
   We do not record the timestamp of all ack records, we just record the 
delivery time of entries and do not record all entries, using the position of 
those entries as a ruler. For example:
   - set the config `becomeOldAckRecordSeconds` to `3s`.
   - `2023-03-30 12:00:00` delivery entry `3:1` to client
   - `2023-03-30 12:00:02` delivery entry `4:1` to client
   - `2023-03-30 12:00:04` delivery entry `5:1` to client
   - `2023-03-30 12:00:05` delivery entry `6:1` to client
   - `2023-03-30 12:00:05` receive an ack request `5:20`, we mark this record 
as young because it was only sent out for 1 second.
   - `2023-03-30 12:00:06` receive an ack request `1:1`, we mark this record as 
old because it's been at least 6s since it was sent
   
   
   
   
   
   ### Security Considerations
   
   -
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to