lianetm commented on code in PR #14503: URL: https://github.com/apache/kafka/pull/14503#discussion_r1356848021
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -235,15 +235,18 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { * defined */ private boolean updateFetchPositionsIfNeeded(final Timer timer) { - // If any partitions have been truncated due to a leader change, we need to validate the offsets + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position against it. ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); eventHandler.add(validatePositionsEvent); - // If there are any partitions which do not have a valid position and are not - // awaiting reset, then we need to fetch committed offsets. We will only do a - // coordinator lookup if there are partitions which have missing positions, so - // a consumer with manually assigned partitions can avoid a coordinator dependence - // by always ensuring that assigned partitions have an initial position. + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. We + // will only do a coordinator lookup if there are partitions which have missing + // positions, so a consumer with manually assigned partitions can avoid a coordinator + // dependence by always ensuring that assigned partitions have an initial position. This + // will trigger an OffsetFetch request and update positions with the offsets retrieved. Review Comment: Agree. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org