heesung-sn commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1291950076


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext 
publishContext, ByteBuf heade
 
         String producerName = publishContext.getProducerName();
         long sequenceId = publishContext.getSequenceId();
+        // The process of the Producer sending chunk messages is continuous, 
and all chunks of a message use the same

Review Comment:
   General Question: how does pulsar broker guarantee messages are received in 
the order? 
   
   In other words, how do we guarantee that this `isDuplicate` is called in the 
consecutive order:e.g. m1-c1, m1-c2, m2-c1, m2-c2?
   
   Do we have any documentation about it? I assume we rely on the netty tcp 
connection layer for this Channel.write, but I would like to confirm this. 
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext 
publishContext, ByteBuf heade
 
         String producerName = publishContext.getProducerName();
         long sequenceId = publishContext.getSequenceId();
+        // The process of the Producer sending chunk messages is continuous, 
and all chunks of a message use the same
+        // message metadata and sequence ID. Therefore, it is only necessary 
to check if the sequence ID of the first
+        // chunk is duplicated.
+        // When we receive the initial message of a non-duplicated chunk 
message, we place it in the
+        // chunkMessageOngoing. Upon completion of sending this chunk message, 
if we receive other messages
+        // sent by this Producer, we will remove it from the 
chunkMessageOngoing.
+        headersAndPayload.markReaderIndex();
+        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+        headersAndPayload.resetReaderIndex();
+        if (chunkMessageOngoing.containsKey(producerName)) {
+            //Deduplication requires the dependency on the ordered nature of 
messages to work accurately.
+            if (publishContext.isChunked() && 
chunkMessageOngoing.get(producerName) < msgMetadata.getChunkId()) {
+                chunkMessageOngoing.put(producerName, 
msgMetadata.getChunkId());
+                return MessageDupStatus.NotDup;
+            } else {
+                chunkMessageOngoing.remove(producerName);

Review Comment:
   Maybe optimize this if-else code a little to remove only when it contains?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext 
publishContext, ByteBuf heade
 
         String producerName = publishContext.getProducerName();
         long sequenceId = publishContext.getSequenceId();
+        // The process of the Producer sending chunk messages is continuous, 
and all chunks of a message use the same
+        // message metadata and sequence ID. Therefore, it is only necessary 
to check if the sequence ID of the first
+        // chunk is duplicated.
+        // When we receive the initial message of a non-duplicated chunk 
message, we place it in the
+        // chunkMessageOngoing. Upon completion of sending this chunk message, 
if we receive other messages
+        // sent by this Producer, we will remove it from the 
chunkMessageOngoing.
+        headersAndPayload.markReaderIndex();
+        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+        headersAndPayload.resetReaderIndex();
+        if (chunkMessageOngoing.containsKey(producerName)) {
+            //Deduplication requires the dependency on the ordered nature of 
messages to work accurately.

Review Comment:
   `the consecutively ordered nature`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -55,6 +55,8 @@ public class MessageDeduplication {
     private final ManagedLedger managedLedger;
     private ManagedCursor managedCursor;
 
+    private final ConcurrentOpenHashMap<String, Integer> chunkMessageOngoing;

Review Comment:
   What if the chunk msg persist fails on bk after `isDuplicate`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext 
publishContext, ByteBuf heade
 
         String producerName = publishContext.getProducerName();
         long sequenceId = publishContext.getSequenceId();
+        // The process of the Producer sending chunk messages is continuous, 
and all chunks of a message use the same
+        // message metadata and sequence ID. Therefore, it is only necessary 
to check if the sequence ID of the first
+        // chunk is duplicated.
+        // When we receive the initial message of a non-duplicated chunk 
message, we place it in the
+        // chunkMessageOngoing. Upon completion of sending this chunk message, 
if we receive other messages
+        // sent by this Producer, we will remove it from the 
chunkMessageOngoing.
+        headersAndPayload.markReaderIndex();
+        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+        headersAndPayload.resetReaderIndex();
+        if (chunkMessageOngoing.containsKey(producerName)) {
+            //Deduplication requires the dependency on the ordered nature of 
messages to work accurately.
+            if (publishContext.isChunked() && 
chunkMessageOngoing.get(producerName) < msgMetadata.getChunkId()) {
+                chunkMessageOngoing.put(producerName, 
msgMetadata.getChunkId());
+                return MessageDupStatus.NotDup;

Review Comment:
   It appears that we don't need to check seq_id here due to the consecutive 
nature, but would it be safe to check it?
   
   m1-chunk-1, seqid=1
   m1-chunk-2, seqid=1
   m2-chunk-1, seqid=2
   m2-chunk-2, seqid=1 //are we sure this will never happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to