jolshan commented on code in PR #12462: URL: https://github.com/apache/kafka/pull/12462#discussion_r1063875994
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -287,12 +291,16 @@ public RecordAppendResult append(String topic, } } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. + // noDqForPartition is true either when 1. partition was encountered for first time so no Deque existed previously. + // 2. DQ was removed due to - all batches were cleared due to expiration or sender cleared batches after draining. + // if so, abort and look to call partitioner -> onNewBatch and select other partition. + // This prevents a single partition getting re-selected after recent drain. + if (abortOnNewBatch && noDqForPartition) { Review Comment: Yeah -- it seems like abortOnNewBatch being hardcoded has always been an issue. Any partitioner that doesn't implement it (and essentially batch on partitions like the old sticky partitioner) doesn't really have a use for it as far as I can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org