lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1322041495


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -154,6 +170,52 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestamp>> fetchOffsets(
                 
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
result.fetchedOffsets));
     }
 
+    /**
+     * Reset offsets for all assigned partitions that require it. Offsets will 
be reset

Review Comment:
   We do have the logic, but the case is that if the reset policy is not set, 
an exception is thrown at the moment of marking the partition as needing reset 
on the `resetInitializingPositions` (not when the actual reset is called on the 
partitions marked as needing the reset). 
   
   So it is 
[here](https://github.com/apache/kafka/blob/4a9e1bc6edddfa0e537bc1dffa7abc61c85f19bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java#L237),
 right before the reset, when the `NoOffsetForPartitionException` would be 
thrown. This is also the behaviour of the current consumer implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to