lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1322041495
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -154,6 +170,52 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } + /** + * Reset offsets for all assigned partitions that require it. Offsets will be reset Review Comment: We do have the logic, but the case is that if the reset policy is not set, an exception is thrown at the moment of marking the partition as needing reset on the `resetInitializingPositions` (not when the actual reset is called on the partitions marked as needing the reset). So it is [here](https://github.com/apache/kafka/blob/4a9e1bc6edddfa0e537bc1dffa7abc61c85f19bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java#L237), right before the reset, when the `NoOffsetForPartitionException` would be thrown. This is also the behaviour of the current consumer implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org