lucasbru commented on code in PR #14503:
URL: https://github.com/apache/kafka/pull/14503#discussion_r1356610621


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -252,8 +255,10 @@ private boolean updateFetchPositionsIfNeeded(final Timer 
timer) {
         // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
         subscriptions.resetInitializingPositions();
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
+        // Reset positions using partitions offsets retrieved from the leader, 
for any partitions

Review Comment:
   ```suggestion
           // Reset positions using partition offsets retrieved from the 
leader, for any partitions
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -235,15 +235,18 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
      *                                                                defined
      */
     private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        // Validate positions using the partition leader end offsets, to 
detect if any partition
+        // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+        // request, retrieve the partition end offsets, and validate the 
current position against it.
         ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
         eventHandler.add(validatePositionsEvent);
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
+        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
+        // partitions which do  not have a valid position and are not awaiting 
reset. We
+        // will only do a coordinator lookup if there are partitions which 
have missing
+        // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+        // dependence by always ensuring that assigned partitions have an 
initial position. This
+        // will trigger an OffsetFetch request and update positions with the 
offsets retrieved.

Review Comment:
   Could you swap sentence 2 and 3 in this paragraph? It sounds like having 
manual partition assignment with an initial position will trigger an 
`OffsetFetch` request in this order.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -235,15 +235,18 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
      *                                                                defined
      */
     private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        // Validate positions using the partition leader end offsets, to 
detect if any partition
+        // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+        // request, retrieve the partition end offsets, and validate the 
current position against it.
         ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
         eventHandler.add(validatePositionsEvent);
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
+        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
+        // partitions which do  not have a valid position and are not awaiting 
reset. We

Review Comment:
   ```suggestion
           // partitions which do not have a valid position and are not 
awaiting reset. We
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -322,6 +323,25 @@ public void testOffsetsForTimesOnNullPartitions() {
                 Duration.ofMillis(1)));
     }
 
+    @Test
+    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
+        PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new 
StringDeserializer(), new StringDeserializer());
+        assertThrows(IllegalArgumentException.class,
+                () -> consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(
+                                "topic1", 1), 
ListOffsetsRequest.EARLIEST_TIMESTAMP),
+                        Duration.ofMillis(1)));
+
+        assertThrows(IllegalArgumentException.class,
+                () -> consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(

Review Comment:
   Why do we test this for these three constants and not for 
`EARLIEST_LOCAL_TIMESTAMP`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to