junrao commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r970071437
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata,
Exception exception) {
assertEquals(partition1, partition.get());
assertEquals(2, mockRandom.get());
- // Produce large record, we should switch to next partition.
+ // Produce large record, we switched to next partition by previous
produce, but
+ // for this produce the switch would be disabled because of
incomplete batch.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null,
largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(),
cluster);
assertEquals(partition2, partition.get());
- assertEquals(3, mockRandom.get());
+ assertEquals(2, mockRandom.get());
Review Comment:
Thanks, Artem. Sorry, I still don't fully understand.
After step 3, it seems that we switched to partition 2 after the append
since mockRandom is 2, right? That part makes sense to me.
In step 4, we append to a new batch in partition 2. After the append(), it
seems that enableSwitch should be true since `last.isFull()` should be true.
Then, in `topicInfo.builtInPartitioner.updatePartitionInfo`, `producedBytes >=
stickyBatchSize && enableSwitch` should be true, which will trigger partition
switching. I am wondering what's missing here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]