devinbost commented on issue #6273: Deduplication fails when the batch message 
contains duplicate message and valid message
URL: https://github.com/apache/pulsar/issues/6273#issuecomment-584565330
 
 
   I'm adding my discussion Slack discussion with @codelipenghui (with his 
permission) to create a better record of our discussion on this issue. 
   
   He documented what happens in this issue:
   
   "For example, we have four batch messages in the producer pending message 
queue. The messages look like this(I show the sequence ID for more 
straightforward understanding):
   0 -> [0 - 5]
   1 -> [6, 8]
   2 -> [7 - 10]
   3 -> [12 - 15]
   Batch message-0 has six messages with sequence ID 0 to 5, and batch 
message-1 has two messages with sequence ID 6 and 8, and batch message-2 has 
four messages with sequence ID 7 to 10, and the last batch message-3 has four 
messages with sequence ID 12 to 15.
   When message-0 published to the broker, let’s look at the state of the 
producer and the broker:
   
   - Producer: the last pushed sequence ID is 5, and the last published 
sequence ID is 0 since the publish response not yet returned.
   - Broker: the lash pushed sequence ID is 5, and the last persistent sequence 
ID is 0 since the message not written to the Bookie.
   
   Then the message-1 published to the broker:
   
   - Producer: the last pushed sequence ID is 8, and the last published 
sequence ID is 0 since the publish response not yet returned.
   - Broker: the lash pushed sequence ID is 8, and the last persistent sequence 
ID is 0 since the message not written to the Bookie.
   
   When the producer flushes the message-2 to the broker, the message-2 has a 
sequence ID that lower than the last pushed sequence ID of the producer. So the 
producer should stop flush message-3 because the producer should get the 
response of message-2 and re-batch the messages of message-2. After 
re-batching, the producer should retry the re-batched message. The current 
state of the broker and producer are:
   
   - Producer: the last pushed sequence ID is 10, and the last published 
sequence ID is 10 since the publish response not yet returned.
   - Broker: the lash pushed sequence ID is 10, and the last persistent 
sequence ID is 10 since the message not written to the Bookie.
   
   After message-2 published, the producer starts flushing message-3. So that 
retry the re-batched message can be handled properly.
   But in extreme cases, this will introduce a lot of blocking.
   
   --------
   
   Regarding:
   
   >  When the producer flushes the message-2 to the broker, the message-2 has 
a sequence ID that lower than the last pushed sequence ID of the producer. So 
the producer should stop flush message-3 because the producer should get the 
response of message-2 and re-batch the messages of message-2.
   
   I asked:
   "Would that require the producer to wait for the response of message-2 
before it could flush message-3? (Moving from asynchronous to synchronous 
behavior?)"
   
   He replied:
   "Yes, because if message-3 published to the broker, message-2 will be 
ignored by broker."
   
   I said:
   "hmm… What if the broker could notice that “message-2 has a sequence ID that 
lower than the last pushed sequence ID of the producer” and send back a 
response to the producer that requires the producer to re-submit message-2 with 
the correct sequence?
   I guess it depends on what we want the user experience to be. Should this 
situation be considered an error that we’d want the producer’s developer to 
know about and need to fix? The main broker limitation is that we’re only 
tracking the last sequence ID, so we really have no way to know what was 
missing between those sequence IDs. (In the example of message-1, there’s late 
data that doesn’t appear until message-2.)
   In Apache Flink, they provide a mechanism to allow re-ordering of messages 
(within a time buffer) and de-duplication, but that introduces additional 
latency that is acceptable for a data processing engine but probably wouldn’t 
be acceptable for Pulsar."
   
   He replied: 
   "yes you are right, it wouldn’t be acceptable for Pulsar.
   Maybe we need a PIP to describe the solution, it’s a tricky problem. We need 
to made some trade offs.  Maybe there are better solutions, we need time to 
think about how to properly handle message deduplication. . . The current 
message order dependent deduplication approach brings us many restrictions."
   
   I replied: 
   "I very much agree.
   Instead, we perhaps could implement a sliding window dictionary of Sequence 
IDs that would contain a fixed number of IDs to keep in memory. Lookups would 
still be O(1).  Perhaps a dynamic adjustable window feature could even be at 
provided at some point for dynamic-length deduplication logic. 
   We’d need to find out if there are more reasons that only the last Sequence 
ID is retained.
   If they could be worked around, then this could implement a new feature of 
Pulsar."
   
   He added:
   "I thought of an easy way to deal with the current issue, No need to change 
the current mechanism, Of course, the current mechanism has better room for 
optimization that like your ideas.
   The problem is producer combine duplicate messages and non-duplicate 
messages into a batch message. So we can change the current logic, if the 
message is a possibility duplicate message, we can flush it directly, this will 
avoid combine it into a batch. I think this has a small impact on performance 
and no need to change any broker code."
   
   I asked:
   "Would that mean the producer would need to retain state (like a window of 
Sequence IDs like we were talking about) to be able to identify if duplicate 
messages are mixed with non-duplicate messages into a batch message?"
   
   He replied: 
   "Yes, there are two state named `lastSequenceIdPushed` and 
`lastSequenceIdPublished` , if sequence ID of the new message lower than 
`lastSequenceIdPushed`  but greater than `lastSequenceIdPublished` , This 
message may be duplicated. Then we don’t combine it into the current batch.  If 
 sequence ID of the new message  lower than `lastSequenceIdPublished` ,This is 
an absolute duplicate message, we can return back to users."
   
   To better understand the current implementation, he suggested that I 
investigate these tests:
   `MessageDuplicationTest` and `ClientDeduplicationTest` 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to