This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58dfa1cc815 MINOR - KAFKA-15550: Validation for negative target times 
in offsetsForTimes (#14503)
58dfa1cc815 is described below

commit 58dfa1cc815e3bd36e67407b190c60025338f355
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Oct 13 03:59:57 2023 -0400

    MINOR - KAFKA-15550: Validation for negative target times in 
offsetsForTimes (#14503)
    
    The current KafkaConsumer offsetsForTimes fails with 
IllegalArgumentException if negative target timestamps are provided as 
arguments. This change includes the same validation and tests for the new 
consumer implementation (and some improved comments for the 
updateFetchPositions)
    
    Reviewer: Lucas Brutschy <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/internals/PrototypeAsyncConsumer.java | 28 +++++++++++++++-------
 .../internals/PrototypeAsyncConsumerTest.java      | 20 ++++++++++++++++
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 678765aa6bf..a554fcc56b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -48,7 +48,7 @@
     <suppress id="dontUseSystemExit"
               files="Exit.java"/>
     <suppress checks="ClassFanOutComplexity"
-              
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
+              
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|PrototypeAsyncConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="NPath"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 4fe8fe80c44..2cc8ef2edae 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -235,15 +235,18 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
      *                                                                defined
      */
     private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        // Validate positions using the partition leader end offsets, to 
detect if any partition
+        // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+        // request, retrieve the partition end offsets, and validate the 
current position against it.
         ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
         eventHandler.add(validatePositionsEvent);
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
+        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
+        // partitions which do not have a valid position and are not awaiting 
reset. This will
+        // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+        // will only do a coordinator lookup if there are partitions which 
have missing
+        // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+        // dependence by always ensuring that assigned partitions have an 
initial position.
         if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
             return false;
 
@@ -252,8 +255,10 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
         // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
         subscriptions.resetInitializingPositions();
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
+        // Reset positions using partition offsets retrieved from the leader, 
for any partitions
+        // which are awaiting reset. This will trigger a ListOffset request, 
retrieve the
+        // partition offsets according to the strategy (ex. earliest, latest), 
and update the
+        // positions.
         ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
         eventHandler.add(resetPositionsEvent);
         return true;
@@ -443,6 +448,13 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
         // Keeping same argument validation error thrown by the current 
consumer implementation
         // to avoid API level changes.
         requireNonNull(timestampsToSearch, "Timestamps to search cannot be 
null");
+        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
+            // Exclude the earliest and latest offset here so the timestamp in 
the returned
+            // OffsetAndTimestamp is always positive.
+            if (entry.getValue() < 0)
+                throw new IllegalArgumentException("The target time for 
partition " + entry.getKey() + " is " +
+                        entry.getValue() + ". The target time cannot be 
negative.");
+        }
 
         if (timestampsToSearch.isEmpty()) {
             return Collections.emptyMap();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index 46892215bfa..e5f9c27a28f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -322,6 +323,25 @@ public class PrototypeAsyncConsumerTest {
                 Duration.ofMillis(1)));
     }
 
+    @Test
+    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
+        PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new 
StringDeserializer(), new StringDeserializer());
+        assertThrows(IllegalArgumentException.class,
+                () -> consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(
+                                "topic1", 1), 
ListOffsetsRequest.EARLIEST_TIMESTAMP),
+                        Duration.ofMillis(1)));
+
+        assertThrows(IllegalArgumentException.class,
+                () -> consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(
+                                "topic1", 1), 
ListOffsetsRequest.LATEST_TIMESTAMP),
+                        Duration.ofMillis(1)));
+
+        assertThrows(IllegalArgumentException.class,
+                () -> consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(
+                                "topic1", 1), 
ListOffsetsRequest.MAX_TIMESTAMP),
+                        Duration.ofMillis(1)));
+    }
+
     @Test
     public void testOffsetsForTimes() {
         PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new 
StringDeserializer(), new StringDeserializer());

Reply via email to