sudeshwasnik commented on code in PR #12462:
URL: https://github.com/apache/kafka/pull/12462#discussion_r951338206
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -273,26 +273,29 @@ public RecordAppendResult append(String topic,
// check if we have an in-progress batch
Deque<ProducerBatch> dq =
topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+ RecordAppendResult appendResult;
synchronized (dq) {
// After taking the lock, validate that the partition
hasn't changed and retry.
if
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
log.trace("Partition {} for topic {} switched by a
concurrent append, retrying",
partitionInfo.partition(), topic);
continue;
}
- RecordAppendResult appendResult = tryAppend(timestamp,
key, value, headers, callbacks, dq, nowMs);
- if (appendResult != null) {
+ appendResult = tryAppend(timestamp, key, value, headers,
callbacks, dq, nowMs);
+ if (appendResult != null && !appendResult.newBatchCreated)
{
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo,
appendResult.appendedBytes, cluster);
return appendResult;
}
}
- // we don't have an in-progress record batch try to allocate a
new batch
- if (abortOnNewBatch) {
- // Return a result that will cause another call to append.
+ // either 1. current topicPartition producerBatch is full -
return and prepare for another batch/partition.
+ // 2. no producerBatch existed for this topicPartition, create
a new producerBatch.
+ if (appendResult == null && abortOnNewBatch) {
Review Comment:
QQ - if there are no batches in current partition, are there cases we would
still want to switch partitions? Technically, the partition is completely empty
at this point, and we can create and store new batches. I'll revisit the code
and get back asap if there exists some case where we wouldn't want to fill
empty `batches` in partitions and instead want to switch.
Apart from that, here is some context on intended change from my limited
knowledge on Partitioning logic ->
@artemlivshits IIUC onNewBatch should only be called [when existing batch is
full](https://github.com/apache/kafka/blob/b392cf212f7ed4a82b79c3690b488619c027dba9/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L79).
The difference this PR is trying to make with this condition
[here](https://github.com/apache/kafka/blob/a0e9020f87b0f5014506a32f41bc431c1cbfbc88/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293-L296)
is -
if appendResult is null, that means existing batch for current partition is
full -> in this case, return and call onNewBatch.
if appendResult isn't null, at this point will also imply
appendResult.newBatchCreated [is
true](https://github.com/apache/kafka/blob/a0e9020f87b0f5014506a32f41bc431c1cbfbc88/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L285-L288).
And producerBatch isn't completed for current partition, onNewbatch should not
be called. Instead we should stick to this partition, create a producerBatch
and fill it. We shouldn't switch partitions here.
wdyt ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]