poorbarcode opened a new issue, #19966: URL: https://github.com/apache/pulsar/issues/19966
### Motivation Differ from Kafka; Pulsar supports [non-sequential acknowledgment request](https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#acknowledgment) (just like ack `{pos-1, pos-3, pos-5}`), so instead of a pointer(acknowledged on the left and un-acknowledged on the right), Pulsar needs to persist the acknowledgment state of each message, we call these records `Individual_Deleted_Messages.` Since the frequent persistence of Individual_Deleted_Messages will magnify the amount of BK-Written and increases the latency of ack-response, Broker does not immediately persist it when receiving a consumer's acknowledgment but persists it regularly. This would cause repeated consumption if the broker lost unpersisted Individual_Deleted_Messages in memory by Broker crash(which is an intended behavior). The current persistence mechanism of Individual_Deleted_Messages works like this: 1. write an Entry(the data of Individual_Deleted_Messages) to BK; by default, the maximum size of the Entry is 5MB. 2. write the data if Individual_Deleted_Messages to ZK if BK-Write fails; data of a ZK node is less than 10MB is recommended. Therefore, the data of Individual_Deleted_Messages is recommended to be less than 5MB; if a subscription's Individual_Deleted_Messages data is too large to persist, as the program grows for a long time, there will be more and more non-persistent data. Eventually, there will be an unacceptable amount of repeated consumption of messages when the Broker restarts. Is 5MB enough? Individual_Deleted_Messages consists of Position_Rang(each Position_Rang occupies 32 bytes, the implementation will not explain in this proposal), this means that the Broker can persist `32MB / 32bytes` number of Position_Rang for each Subscription, and there is an additional compression mechanism at work, so it is sufficient for almost all scenarios except the following three: 1. Clients receive many messages, and ack some of them, the rest still need to be answered due to errors or other reasons. As time goes on, more and more records will be staying there. We call this scenario Client-Scenario. 2. If the number of consumers is large and each has some discrete ack records, all add up to a large number. We're going to classify it as Client-Scenario too. 3. Long-delayed and short-delayed messages are mixed together, with only the short-delayed message successfully consumed and the long-delayed message not delivered. As time goes on, more and more records will be staying there. We call this scenario Delay-Message-Scenario. 4. Due to heavy traffic, a large number of discrete ack records exist in the memory, but these records will become continuous records in a short time. For example, there are a lot of ack records like this `[1,3,5.....1000001]`, and the records `[2,4,5.....1000000]` will be acked soon. We call this scenario Heavy-Traffic-Scenario. ### Goal This proposal is designed to resolve the problem of repeated consumption of large numbers of messages after the broker restarts in scenarios Client-Scenario and Heavy-Traffic-Scenario. As for Delay-Message-Scenario, it will be solved by the design of the Delay-Message, which is not within the scope of this proposal. #### For Client-Scenario Provide a mechanism to make the Dispatcher block dispatching once un-persist ack records reach the max limit, we call this mechanism Max Un-Persist Ack Limitation and provide a config to set the max bytes size of un-persist ack records, we call this config `maxUnPersistAckRecordsBytes,` it will work like [maxUnackedMessagesPerSubscription](https://github.com/apache/pulsar/pull/399)). #### For Heavy-Traffic-Scenario Provide a mechanism for selective persistence of ack records: just persist the ack records which stay in memory for a long time, do not persist if it is new. This sounds like the JVM dividing objects into the young generation and the old generation. We call this mechanism Generational Ack Persistent. For example: - there have some ack records `[1,3,5,101,103]`; - the records`[1,3,5]` is old and we do not know when `[2,4]` will be acknowledged; - the records`[101,103,105.....100001]` are young, and the records `[102,104,106.....100000]` will be acknowledged soon. - The persistence the first time: just persist the records `[1,3,5]` - The persistence the second time: after `[102,104,106.....100000]` are labeled as the old data, the records `[102,104,106.....100000]` have already been acknowleged, then we can compress them to `[101-100001]`, which could save a lot of space. We will provide a config to set how long seconds it takes for an ack record to become old data, we call this config `becomeOldAckRecordSeconds.` Both mechanisms can be turned on at the same time for better results. ### API Changes ##### broker.conf ```java /** * Max bytes size of un-persist ack records, the Dispatcher will block dispatching if it hits this limitation, default value is 5M. * If the value is less than 1, the feature is turned off. And this config is limited, if it is not less than 1, it should be larger or equal to 1K. */ long maxUnPersistAckRecordsBytes; ``` ##### Policies of topic Since not all topics have huge traffic, we do not make the mechanism Generational Ack Persistent as a global feature and only apply it to specific topics, so just add this config in the Topic Policy. ```java /** * How long seconds it takes for an ack record to become old data? * If this value is large than 0, the subscriptions will just persist old ack records. */ int becomeOldAckRecordSeconds; ``` ### Implementation #### For Client-Scenario We don't calculate the size of the ack records in real-time, we just mark the value when ack data is persisting, so the size of ack data is an estimate. #### For Heavy-Traffic-Scenario We do not record the timestamp of all ack records, we just record the delivery time of entries and do not record all entries, using the position of those entries as a ruler. For example: - set the config `becomeOldAckRecordSeconds` to `3s`. - `2023-03-30 12:00:00` delivery entry `3:1` to client - `2023-03-30 12:00:02` delivery entry `4:1` to client - `2023-03-30 12:00:04` delivery entry `5:1` to client - `2023-03-30 12:00:05` delivery entry `6:1` to client - `2023-03-30 12:00:05` receive an ack request `5:20`, we mark this record as young because it was only sent out for 1 second. - `2023-03-30 12:00:06` receive an ack request `1:1`, we mark this record as old because it's been at least 6s since it was sent ### Security Considerations - ### Alternatives _No response_ ### Anything else? _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
