heesung-sn commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1291950076
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext
publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
+ // The process of the Producer sending chunk messages is continuous,
and all chunks of a message use the same
Review Comment:
General Question: how does pulsar broker guarantee messages are received in
the order?
In other words, how do we guarantee that this `isDuplicate` is called in the
consecutive order:e.g. m1-c1, m1-c2, m2-c1, m2-c2?
Do we have any documentation about it? I assume we rely on the netty tcp
connection layer for this Channel.write, but I would like to confirm this.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext
publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
+ // The process of the Producer sending chunk messages is continuous,
and all chunks of a message use the same
+ // message metadata and sequence ID. Therefore, it is only necessary
to check if the sequence ID of the first
+ // chunk is duplicated.
+ // When we receive the initial message of a non-duplicated chunk
message, we place it in the
+ // chunkMessageOngoing. Upon completion of sending this chunk message,
if we receive other messages
+ // sent by this Producer, we will remove it from the
chunkMessageOngoing.
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (chunkMessageOngoing.containsKey(producerName)) {
+ //Deduplication requires the dependency on the ordered nature of
messages to work accurately.
+ if (publishContext.isChunked() &&
chunkMessageOngoing.get(producerName) < msgMetadata.getChunkId()) {
+ chunkMessageOngoing.put(producerName,
msgMetadata.getChunkId());
+ return MessageDupStatus.NotDup;
+ } else {
+ chunkMessageOngoing.remove(producerName);
Review Comment:
Maybe optimize this if-else code a little to remove only when it contains?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext
publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
+ // The process of the Producer sending chunk messages is continuous,
and all chunks of a message use the same
+ // message metadata and sequence ID. Therefore, it is only necessary
to check if the sequence ID of the first
+ // chunk is duplicated.
+ // When we receive the initial message of a non-duplicated chunk
message, we place it in the
+ // chunkMessageOngoing. Upon completion of sending this chunk message,
if we receive other messages
+ // sent by this Producer, we will remove it from the
chunkMessageOngoing.
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (chunkMessageOngoing.containsKey(producerName)) {
+ //Deduplication requires the dependency on the ordered nature of
messages to work accurately.
Review Comment:
`the consecutively ordered nature`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -55,6 +55,8 @@ public class MessageDeduplication {
private final ManagedLedger managedLedger;
private ManagedCursor managedCursor;
+ private final ConcurrentOpenHashMap<String, Integer> chunkMessageOngoing;
Review Comment:
What if the chunk msg persist fails on bk after `isDuplicate`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext
publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
+ // The process of the Producer sending chunk messages is continuous,
and all chunks of a message use the same
+ // message metadata and sequence ID. Therefore, it is only necessary
to check if the sequence ID of the first
+ // chunk is duplicated.
+ // When we receive the initial message of a non-duplicated chunk
message, we place it in the
+ // chunkMessageOngoing. Upon completion of sending this chunk message,
if we receive other messages
+ // sent by this Producer, we will remove it from the
chunkMessageOngoing.
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (chunkMessageOngoing.containsKey(producerName)) {
+ //Deduplication requires the dependency on the ordered nature of
messages to work accurately.
+ if (publishContext.isChunked() &&
chunkMessageOngoing.get(producerName) < msgMetadata.getChunkId()) {
+ chunkMessageOngoing.put(producerName,
msgMetadata.getChunkId());
+ return MessageDupStatus.NotDup;
Review Comment:
It appears that we don't need to check seq_id here due to the consecutive
nature, but would it be safe to check it?
m1-chunk-1, seqid=1
m1-chunk-2, seqid=1
m2-chunk-1, seqid=2
m2-chunk-2, seqid=1 //are we sure this will never happen?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]