This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58dfa1cc815 MINOR - KAFKA-15550: Validation for negative target times
in offsetsForTimes (#14503)
58dfa1cc815 is described below
commit 58dfa1cc815e3bd36e67407b190c60025338f355
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Oct 13 03:59:57 2023 -0400
MINOR - KAFKA-15550: Validation for negative target times in
offsetsForTimes (#14503)
The current KafkaConsumer offsetsForTimes fails with
IllegalArgumentException if negative target timestamps are provided as
arguments. This change includes the same validation and tests for the new
consumer implementation (and some improved comments for the
updateFetchPositions)
Reviewer: Lucas Brutschy <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../consumer/internals/PrototypeAsyncConsumer.java | 28 +++++++++++++++-------
.../internals/PrototypeAsyncConsumerTest.java | 20 ++++++++++++++++
3 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 678765aa6bf..a554fcc56b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -48,7 +48,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
-
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
+
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|PrototypeAsyncConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath"
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 4fe8fe80c44..2cc8ef2edae 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -235,15 +235,18 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
* defined
*/
private boolean updateFetchPositionsIfNeeded(final Timer timer) {
- // If any partitions have been truncated due to a leader change, we
need to validate the offsets
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position against it.
ValidatePositionsApplicationEvent validatePositionsEvent = new
ValidatePositionsApplicationEvent();
eventHandler.add(validatePositionsEvent);
- // If there are any partitions which do not have a valid position and
are not
- // awaiting reset, then we need to fetch committed offsets. We will
only do a
- // coordinator lookup if there are partitions which have missing
positions, so
- // a consumer with manually assigned partitions can avoid a
coordinator dependence
- // by always ensuring that assigned partitions have an initial
position.
+ // Reset positions using committed offsets retrieved from the group
coordinator, for any
+ // partitions which do not have a valid position and are not awaiting
reset. This will
+ // trigger an OffsetFetch request and update positions with the
offsets retrieved. This
+ // will only do a coordinator lookup if there are partitions which
have missing
+ // positions, so a consumer with manually assigned partitions can
avoid a coordinator
+ // dependence by always ensuring that assigned partitions have an
initial position.
if (isCommittedOffsetsManagementEnabled() &&
!refreshCommittedOffsetsIfNeeded(timer))
return false;
@@ -252,8 +255,10 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
// are partitions with a missing position, then we will raise a
NoOffsetForPartitionException exception.
subscriptions.resetInitializingPositions();
- // Finally send an asynchronous request to look up and update the
positions of any
- // partitions which are awaiting reset.
+ // Reset positions using partition offsets retrieved from the leader,
for any partitions
+ // which are awaiting reset. This will trigger a ListOffset request,
retrieve the
+ // partition offsets according to the strategy (ex. earliest, latest),
and update the
+ // positions.
ResetPositionsApplicationEvent resetPositionsEvent = new
ResetPositionsApplicationEvent();
eventHandler.add(resetPositionsEvent);
return true;
@@ -443,6 +448,13 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
// Keeping same argument validation error thrown by the current
consumer implementation
// to avoid API level changes.
requireNonNull(timestampsToSearch, "Timestamps to search cannot be
null");
+ for (Map.Entry<TopicPartition, Long> entry :
timestampsToSearch.entrySet()) {
+ // Exclude the earliest and latest offset here so the timestamp in
the returned
+ // OffsetAndTimestamp is always positive.
+ if (entry.getValue() < 0)
+ throw new IllegalArgumentException("The target time for
partition " + entry.getKey() + " is " +
+ entry.getValue() + ". The target time cannot be
negative.");
+ }
if (timestampsToSearch.isEmpty()) {
return Collections.emptyMap();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index 46892215bfa..e5f9c27a28f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
@@ -322,6 +323,25 @@ public class PrototypeAsyncConsumerTest {
Duration.ofMillis(1)));
}
+ @Test
+ public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
+ PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new
StringDeserializer(), new StringDeserializer());
+ assertThrows(IllegalArgumentException.class,
+ () -> consumer.offsetsForTimes(Collections.singletonMap(new
TopicPartition(
+ "topic1", 1),
ListOffsetsRequest.EARLIEST_TIMESTAMP),
+ Duration.ofMillis(1)));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> consumer.offsetsForTimes(Collections.singletonMap(new
TopicPartition(
+ "topic1", 1),
ListOffsetsRequest.LATEST_TIMESTAMP),
+ Duration.ofMillis(1)));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> consumer.offsetsForTimes(Collections.singletonMap(new
TopicPartition(
+ "topic1", 1),
ListOffsetsRequest.MAX_TIMESTAMP),
+ Duration.ofMillis(1)));
+ }
+
@Test
public void testOffsetsForTimes() {
PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new
StringDeserializer(), new StringDeserializer());