poorbarcode commented on code in PR #21048:
URL: https://github.com/apache/pulsar/pull/21048#discussion_r1310025536


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -695,6 +695,12 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
                 op = OpSendMsg.create(msg, null, sequenceId, callback);
                 final MessageMetadata finalMsgMetadata = msgMetadata;
                 op.rePopulate = () -> {
+                    if (msgMetadata.hasChunkId()) {
+                        // The message metadata is shared between all chunks 
in a large message
+                        // We need to reset the chunk id for each call of this 
method
+                        // It's safe to do that because there is only 1 thread 
to manipulate this message metadata
+                        finalMsgMetadata.setChunkId(chunkId);

Review Comment:
   - Since multiple chunks use the same message meta, the chunk ID must be 
reset when resending messages, right?<sup>[1]</sup>
   - So it's solving a different problem, right?<sup>[2]</sup>  



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -46,6 +47,7 @@
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")

Review Comment:
   Since `DeadLetterTopicTest` was marked `flaky`, suggested creating a new 
test to cover the test this PR is adding, to avoid this test being ignored in 
the future.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -138,8 +138,11 @@ public void run(Timeout t) throws Exception {
                         if (!headPartition.isEmpty()) {
                             log.info("[{}] {} messages will be re-delivered", 
consumerBase, headPartition.size());
                             headPartition.forEach(messageId -> {
-                                
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, 
consumerBase);
-                                messageIds.add(messageId);
+                                if (messageId instanceof ChunkMessageIdImpl) {
+                                    
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, 
consumerBase);
+                                } else {
+                                    messageIds.add(messageId);

Review Comment:
   Because `messageIds` is a `Set`, the change here is an improvement to save 
once `set.add()`. The original code does not have bugs, right?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to