poorbarcode commented on code in PR #21048:
URL: https://github.com/apache/pulsar/pull/21048#discussion_r1310025536
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -695,6 +695,12 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
op = OpSendMsg.create(msg, null, sequenceId, callback);
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
+ if (msgMetadata.hasChunkId()) {
+ // The message metadata is shared between all chunks
in a large message
+ // We need to reset the chunk id for each call of this
method
+ // It's safe to do that because there is only 1 thread
to manipulate this message metadata
+ finalMsgMetadata.setChunkId(chunkId);
Review Comment:
- Since multiple chunks use the same message meta, the chunk ID must be
reset when resending messages, right?<sup>[1]</sup>
- So it's solving a different problem, right?<sup>[2]</sup>
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -46,6 +47,7 @@
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "flaky")
Review Comment:
Since `DeadLetterTopicTest` was marked `flaky`, suggested creating a new
test to cover the test this PR is adding, to avoid this test being ignored in
the future.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -138,8 +138,11 @@ public void run(Timeout t) throws Exception {
if (!headPartition.isEmpty()) {
log.info("[{}] {} messages will be re-delivered",
consumerBase, headPartition.size());
headPartition.forEach(messageId -> {
-
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds,
consumerBase);
- messageIds.add(messageId);
+ if (messageId instanceof ChunkMessageIdImpl) {
+
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds,
consumerBase);
+ } else {
+ messageIds.add(messageId);
Review Comment:
Because `messageIds` is a `Set`, the change here is an improvement to save
once `set.add()`. The original code does not have bugs, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]