liangyepianzhou commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1292125960
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -323,19 +332,34 @@ public MessageDupStatus isDuplicate(PublishContext
publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
+ // The process of the Producer sending chunk messages is continuous,
and all chunks of a message use the same
+ // message metadata and sequence ID. Therefore, it is only necessary
to check if the sequence ID of the first
+ // chunk is duplicated.
+ // When we receive the initial message of a non-duplicated chunk
message, we place it in the
+ // chunkMessageOngoing. Upon completion of sending this chunk message,
if we receive other messages
+ // sent by this Producer, we will remove it from the
chunkMessageOngoing.
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (chunkMessageOngoing.containsKey(producerName)) {
+ //Deduplication requires the dependency on the ordered nature of
messages to work accurately.
+ if (publishContext.isChunked() &&
chunkMessageOngoing.get(producerName) < msgMetadata.getChunkId()) {
+ chunkMessageOngoing.put(producerName,
msgMetadata.getChunkId());
+ return MessageDupStatus.NotDup;
Review Comment:
I agree. It's better to check seq_id too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]