devinbost commented on issue #6273: Deduplication fails when the batch message contains duplicate message and valid message URL: https://github.com/apache/pulsar/issues/6273#issuecomment-584565330 I'm adding my discussion Slack discussion with @codelipenghui (with his permission) to create a better record of our discussion on this issue. He documented what happens in this issue: "For example, we have four batch messages in the producer pending message queue. The messages look like this(I show the sequence ID for more straightforward understanding): 0 -> [0 - 5] 1 -> [6, 8] 2 -> [7 - 10] 3 -> [12 - 15] Batch message-0 has six messages with sequence ID 0 to 5, and batch message-1 has two messages with sequence ID 6 and 8, and batch message-2 has four messages with sequence ID 7 to 10, and the last batch message-3 has four messages with sequence ID 12 to 15. When message-0 published to the broker, let’s look at the state of the producer and the broker: - Producer: the last pushed sequence ID is 5, and the last published sequence ID is 0 since the publish response not yet returned. - Broker: the lash pushed sequence ID is 5, and the last persistent sequence ID is 0 since the message not written to the Bookie. Then the message-1 published to the broker: - Producer: the last pushed sequence ID is 8, and the last published sequence ID is 0 since the publish response not yet returned. - Broker: the lash pushed sequence ID is 8, and the last persistent sequence ID is 0 since the message not written to the Bookie. When the producer flushes the message-2 to the broker, the message-2 has a sequence ID that lower than the last pushed sequence ID of the producer. So the producer should stop flush message-3 because the producer should get the response of message-2 and re-batch the messages of message-2. After re-batching, the producer should retry the re-batched message. The current state of the broker and producer are: - Producer: the last pushed sequence ID is 10, and the last published sequence ID is 10 since the publish response not yet returned. - Broker: the lash pushed sequence ID is 10, and the last persistent sequence ID is 10 since the message not written to the Bookie. After message-2 published, the producer starts flushing message-3. So that retry the re-batched message can be handled properly. But in extreme cases, this will introduce a lot of blocking. -------- Regarding: > When the producer flushes the message-2 to the broker, the message-2 has a sequence ID that lower than the last pushed sequence ID of the producer. So the producer should stop flush message-3 because the producer should get the response of message-2 and re-batch the messages of message-2. I asked: "Would that require the producer to wait for the response of message-2 before it could flush message-3? (Moving from asynchronous to synchronous behavior?)" He replied: "Yes, because if message-3 published to the broker, message-2 will be ignored by broker." I said: "hmm… What if the broker could notice that “message-2 has a sequence ID that lower than the last pushed sequence ID of the producer” and send back a response to the producer that requires the producer to re-submit message-2 with the correct sequence? I guess it depends on what we want the user experience to be. Should this situation be considered an error that we’d want the producer’s developer to know about and need to fix? The main broker limitation is that we’re only tracking the last sequence ID, so we really have no way to know what was missing between those sequence IDs. (In the example of message-1, there’s late data that doesn’t appear until message-2.) In Apache Flink, they provide a mechanism to allow re-ordering of messages (within a time buffer) and de-duplication, but that introduces additional latency that is acceptable for a data processing engine but probably wouldn’t be acceptable for Pulsar." He replied: "yes you are right, it wouldn’t be acceptable for Pulsar. Maybe we need a PIP to describe the solution, it’s a tricky problem. We need to made some trade offs. Maybe there are better solutions, we need time to think about how to properly handle message deduplication. . . The current message order dependent deduplication approach brings us many restrictions." I replied: "I very much agree. Instead, we perhaps could implement a sliding window dictionary of Sequence IDs that would contain a fixed number of IDs to keep in memory. Lookups would still be O(1). Perhaps a dynamic adjustable window feature could even be at provided at some point for dynamic-length deduplication logic. We’d need to find out if there are more reasons that only the last Sequence ID is retained. If they could be worked around, then this could implement a new feature of Pulsar." He added: "I thought of an easy way to deal with the current issue, No need to change the current mechanism, Of course, the current mechanism has better room for optimization that like your ideas. The problem is producer combine duplicate messages and non-duplicate messages into a batch message. So we can change the current logic, if the message is a possibility duplicate message, we can flush it directly, this will avoid combine it into a batch. I think this has a small impact on performance and no need to change any broker code." I asked: "Would that mean the producer would need to retain state (like a window of Sequence IDs like we were talking about) to be able to identify if duplicate messages are mixed with non-duplicate messages into a batch message?" He replied: "Yes, there are two state named `lastSequenceIdPushed` and `lastSequenceIdPublished` , if sequence ID of the new message lower than `lastSequenceIdPushed` but greater than `lastSequenceIdPublished` , This message may be duplicated. Then we don’t combine it into the current batch. If sequence ID of the new message lower than `lastSequenceIdPublished` ,This is an absolute duplicate message, we can return back to users." To better understand the current implementation, he suggested that I investigate these tests: `MessageDuplicationTest` and `ClientDeduplicationTest`
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
