This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae3c5dec99b KAFKA-18013: Add support for duration based offset reset 
strategy to Kafka Consumer (#17972)
ae3c5dec99b is described below

commit ae3c5dec99b6f20d860c945101d792c436301dc7
Author: Manikumar Reddy <[email protected]>
AuthorDate: Fri Nov 29 22:38:57 2024 +0530

    KAFKA-18013: Add support for duration based offset reset strategy to Kafka 
Consumer (#17972)
    
    Update AutoOffsetResetStrategy.java to support duration based reset strategy
    Update OffsetFetcher related classes and unit tests
    
    Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +
 .../kafka/clients/consumer/MockConsumer.java       |  10 ++
 .../internals/AutoOffsetResetStrategy.java         | 106 ++++++++++++++++-----
 .../clients/consumer/internals/OffsetFetcher.java  |  13 ++-
 .../consumer/internals/OffsetFetcherUtils.java     |  39 +++-----
 .../consumer/internals/OffsetsRequestManager.java  |  18 ++--
 .../consumer/internals/SubscriptionState.java      |   2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  17 +++-
 .../kafka/clients/consumer/MockConsumerTest.java   |  22 +++++
 .../internals/AutoOffsetResetStrategyTest.java     |  83 ++++++++++++----
 .../consumer/internals/OffsetFetcherTest.java      |  23 +++++
 11 files changed, 252 insertions(+), 83 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 779a9933fd7..fb4c05062bb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
             "(e.g. because that data has been deleted): " +
             "<ul><li>earliest: automatically reset the offset to the earliest 
offset" +
             "<li>latest: automatically reset the offset to the latest 
offset</li>" +
+            "<li>by_duration:<duration>: automatically reset the offset to a 
configured <duration> from the current timestamp. <duration> must be specified 
in ISO8601 format (PnDTnHnMn.nS). " +
+            "Negative duration is not allowed.</li>" +
             "<li>none: throw exception to the consumer if no previous offset 
is found for the consumer's group</li>" +
             "<li>anything else: throw exception to the consumer.</li></ul>" +
             "<p>Note that altering partition numbers while setting this config 
to latest may cause message delivery loss since " +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a900fac95aa..817e8a63b20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final SubscriptionState subscriptions;
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
+    private final Map<TopicPartition, Long> durationResetOffsets;
     private final Map<TopicPartition, OffsetAndMetadata> committed;
     private final Queue<Runnable> pollTasks;
     private final Set<TopicPartition> paused;
@@ -104,6 +105,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.closed = false;
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
+        this.durationResetOffsets = new HashMap<>();
         this.pollTasks = new LinkedList<>();
         this.pollException = null;
         this.wakeup = new AtomicBoolean(false);
@@ -433,6 +435,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         endOffsets.putAll(newOffsets);
     }
 
+    public synchronized void updateDurationOffsets(final Map<TopicPartition, 
Long> newOffsets) {
+        durationResetOffsets.putAll(newOffsets);
+    }
+
     public void disableTelemetry() {
         telemetryDisabled = true;
     }
@@ -610,6 +616,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             offset = endOffsets.get(tp);
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have end 
offset specified, but tried to seek to end");
+        } else if (strategy.type() == 
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+            offset = durationResetOffsets.get(tp);
+            if (offset == null)
+                throw new IllegalStateException("MockConsumer didn't have 
duration offset specified, but tried to seek to timestamp");
         } else {
             throw new NoOffsetForPartitionException(tp);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
index a94eb4585a3..6eecc4e09d8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
@@ -18,15 +18,19 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.Utils;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.Optional;
 
 public class AutoOffsetResetStrategy {
-    private enum StrategyType {
-        LATEST, EARLIEST, NONE;
+    public enum StrategyType {
+        LATEST, EARLIEST, NONE, BY_DURATION;
 
         @Override
         public String toString() {
@@ -39,30 +43,65 @@ public class AutoOffsetResetStrategy {
     public static final AutoOffsetResetStrategy NONE = new 
AutoOffsetResetStrategy(StrategyType.NONE);
 
     private final StrategyType type;
+    private final Optional<Duration> duration;
 
     private AutoOffsetResetStrategy(StrategyType type) {
         this.type = type;
+        this.duration = Optional.empty();
     }
 
-    public static boolean isValid(String offsetStrategy) {
-        return 
Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
+    private AutoOffsetResetStrategy(Duration duration) {
+        this.type = StrategyType.BY_DURATION;
+        this.duration = Optional.of(duration);
     }
 
+    /**
+     *  Returns the AutoOffsetResetStrategy from the given string.
+     */
     public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
-        if (offsetStrategy == null || !isValid(offsetStrategy)) {
-            throw new IllegalArgumentException("Unknown auto offset reset 
strategy: " + offsetStrategy);
+        if (offsetStrategy == null) {
+            throw new IllegalArgumentException("Auto offset reset strategy is 
null");
         }
-        StrategyType type = 
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
-        switch (type) {
-            case EARLIEST:
-                return EARLIEST;
-            case LATEST:
-                return LATEST;
-            case NONE:
-                return NONE;
-            default:
-                throw new IllegalArgumentException("Unknown auto offset reset 
strategy: " + offsetStrategy);
+
+        if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
+            throw new IllegalArgumentException("<:duration> part is missing in 
by_duration auto offset reset strategy.");
+        }
+
+        if 
(Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) 
{
+            StrategyType type = 
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
+            switch (type) {
+                case EARLIEST:
+                    return EARLIEST;
+                case LATEST:
+                    return LATEST;
+                case NONE:
+                    return NONE;
+                default:
+                    throw new IllegalArgumentException("Unknown auto offset 
reset strategy: " + offsetStrategy);
+            }
+        }
+
+        if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+            String isoDuration = 
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+            try {
+                Duration duration = Duration.parse(isoDuration);
+                if (duration.isNegative()) {
+                    throw new IllegalArgumentException("Negative duration is 
not supported in by_duration offset reset strategy.");
+                }
+                return new AutoOffsetResetStrategy(duration);
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Unable to parse duration 
string in by_duration offset reset strategy.", e);
+            }
         }
+
+        throw new IllegalArgumentException("Unknown auto offset reset 
strategy: " + offsetStrategy);
+    }
+
+    /**
+     * Returns the offset reset strategy type.
+     */
+    public StrategyType type() {
+        return type;
     }
 
     /**
@@ -72,33 +111,54 @@ public class AutoOffsetResetStrategy {
         return type.toString();
     }
 
+    /**
+     * Return the timestamp to be used for the ListOffsetsRequest.
+     * @return the timestamp for the OffsetResetStrategy,
+     * if the strategy is EARLIEST or LATEST or duration is provided
+     * else return Optional.empty()
+     */
+    public Optional<Long> timestamp() {
+        if (type == StrategyType.EARLIEST)
+            return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        else if (type == StrategyType.LATEST)
+            return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
+        else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
+            Instant now = Instant.now();
+            return Optional.of(now.minus(duration.get()).toEpochMilli());
+        } else
+            return Optional.empty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
-        return Objects.equals(type, that.type);
+        return type == that.type && Objects.equals(duration, that.duration);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(type);
+        return Objects.hash(type, duration);
     }
 
     @Override
     public String toString() {
         return "AutoOffsetResetStrategy{" +
-                "type='" + type + '\'' +
+                "type=" + type +
+                (duration.map(value -> ", duration=" + value).orElse("")) +
                 '}';
     }
 
     public static class Validator implements ConfigDef.Validator {
         @Override
         public void ensureValid(String name, Object value) {
-            String strategy = (String) value;
-            if (!AutoOffsetResetStrategy.isValid(strategy)) {
-                throw new ConfigException(name, value, "Invalid value " + 
strategy + " for configuration " +
-                        name + ": the value must be either 'earliest', 
'latest', or 'none'.");
+            String offsetStrategy = (String) value;
+            try {
+                fromString(offsetStrategy);
+            } catch (Exception e) {
+                throw new ConfigException(name, value, "Invalid value `" + 
offsetStrategy + "` for configuration " +
+                        name + ". The value must be either 'earliest', 
'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f7646bff9ed..bb01510e906 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -101,12 +101,13 @@ public class OffsetFetcher {
      *                                                                         
and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
      */
     public void resetPositionsIfNeeded() {
-        Map<TopicPartition, Long> offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
+        Map<TopicPartition, AutoOffsetResetStrategy> 
partitionAutoOffsetResetStrategyMap =
+                offsetFetcherUtils.getOffsetResetStrategyForPartitions();
 
-        if (offsetResetTimestamps.isEmpty())
+        if (partitionAutoOffsetResetStrategyMap.isEmpty())
             return;
 
-        resetPositionsAsync(offsetResetTimestamps);
+        resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
     }
 
     /**
@@ -209,7 +210,9 @@ public class OffsetFetcher {
         }
     }
 
-    private void resetPositionsAsync(Map<TopicPartition, Long> 
partitionResetTimestamps) {
+    private void resetPositionsAsync(Map<TopicPartition, 
AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
+        Map<TopicPartition, Long> partitionResetTimestamps = 
partitionAutoOffsetResetStrategyMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().timestamp().get()));
         Map<Node, Map<TopicPartition, ListOffsetsPartition>> 
timestampsToSearchByNode =
                 groupListOffsetRequests(partitionResetTimestamps, new 
HashSet<>());
         for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry 
: timestampsToSearchByNode.entrySet()) {
@@ -221,7 +224,7 @@ public class OffsetFetcher {
             future.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ListOffsetResult result) {
-                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, 
result);
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, 
partitionAutoOffsetResetStrategyMap);
                 }
 
                 @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 2e9d48ebda5..98afb02d6b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.message.ListOffsetsRequestData;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.utils.LogContext;
@@ -224,19 +223,22 @@ class OffsetFetcherUtils {
         }
     }
 
-    Map<TopicPartition, Long> getOffsetResetTimestamp() {
+    /**
+     * get OffsetResetStrategy for all assigned partitions
+     */
+    Map<TopicPartition, AutoOffsetResetStrategy> 
getOffsetResetStrategyForPartitions() {
         // Raise exception from previous offset fetch if there is one
         RuntimeException exception = 
cachedResetPositionsException.getAndSet(null);
         if (exception != null)
             throw exception;
 
         Set<TopicPartition> partitions = 
subscriptionState.partitionsNeedingReset(time.milliseconds());
-        final Map<TopicPartition, Long> offsetResetTimestamps = new 
HashMap<>();
+        final Map<TopicPartition, AutoOffsetResetStrategy> 
partitionAutoOffsetResetStrategyMap = new HashMap<>();
         for (final TopicPartition partition : partitions) {
-            offsetResetTimestamps.put(partition, 
offsetResetStrategyTimestamp(partition));
+            partitionAutoOffsetResetStrategyMap.put(partition, 
offsetResetStrategyWithValidTimestamp(partition));
         }
 
-        return offsetResetTimestamps;
+        return partitionAutoOffsetResetStrategyMap;
     }
 
     static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
@@ -283,14 +285,13 @@ class OffsetFetcherUtils {
         return offsetsResults;
     }
 
-    private long offsetResetStrategyTimestamp(final TopicPartition partition) {
+    private AutoOffsetResetStrategy 
offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
         AutoOffsetResetStrategy strategy = 
subscriptionState.resetStrategy(partition);
-        if (strategy == AutoOffsetResetStrategy.EARLIEST)
-            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
-        else if (strategy == AutoOffsetResetStrategy.LATEST)
-            return ListOffsetsRequest.LATEST_TIMESTAMP;
-        else
+        if (strategy.timestamp().isPresent()) {
+            return strategy;
+        } else {
             throw new NoOffsetForPartitionException(partition);
+        }
     }
 
     static Set<String> topicsForPartitions(Collection<TopicPartition> 
partitions) {
@@ -319,18 +320,9 @@ class OffsetFetcherUtils {
         }
     }
 
-    static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long 
timestamp) {
-        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
-            return AutoOffsetResetStrategy.EARLIEST;
-        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
-            return AutoOffsetResetStrategy.LATEST;
-        else
-            return null;
-    }
-
     void onSuccessfulResponseForResettingPositions(
-            final Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
-            final ListOffsetResult result) {
+            final ListOffsetResult result,
+            final Map<TopicPartition, AutoOffsetResetStrategy> 
partitionAutoOffsetResetStrategyMap) {
         if (!result.partitionsToRetry.isEmpty()) {
             subscriptionState.requestFailed(result.partitionsToRetry, 
time.milliseconds() + retryBackoffMs);
             metadata.requestUpdate(false);
@@ -339,10 +331,9 @@ class OffsetFetcherUtils {
         for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : 
result.fetchedOffsets.entrySet()) {
             TopicPartition partition = fetchedOffset.getKey();
             ListOffsetData offsetData = fetchedOffset.getValue();
-            ListOffsetsRequestData.ListOffsetsPartition requestedReset = 
resetTimestamps.get(partition);
             resetPositionIfNeeded(
                     partition,
-                    timestampToOffsetResetStrategy(requestedReset.timestamp()),
+                    partitionAutoOffsetResetStrategyMap.get(partition),
                     offsetData);
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index db847eae831..7870caec1ba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -472,20 +472,20 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
      * this function (ex. {@link 
org.apache.kafka.common.errors.TopicAuthorizationException})
      */
     CompletableFuture<Void> resetPositionsIfNeeded() {
-        Map<TopicPartition, Long> offsetResetTimestamps;
+        Map<TopicPartition, AutoOffsetResetStrategy> 
partitionAutoOffsetResetStrategyMap;
 
         try {
-            offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
+            partitionAutoOffsetResetStrategyMap = 
offsetFetcherUtils.getOffsetResetStrategyForPartitions();
         } catch (Exception e) {
             CompletableFuture<Void> result = new CompletableFuture<>();
             result.completeExceptionally(e);
             return result;
         }
 
-        if (offsetResetTimestamps.isEmpty())
+        if (partitionAutoOffsetResetStrategyMap.isEmpty())
             return CompletableFuture.completedFuture(null);
 
-        return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+        return 
sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
     }
 
     /**
@@ -652,12 +652,14 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
      * partitions. Use the retrieved offsets to reset positions in the 
subscription state.
      * This also adds the request to the list of unsentRequests.
      *
-     * @param timestampsToSearch the mapping between partitions and target time
+     * @param partitionAutoOffsetResetStrategyMap the mapping between 
partitions and AutoOffsetResetStrategy
      * @return A {@link CompletableFuture} which completes when the requests 
are
      * complete.
      */
     private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
-            final Map<TopicPartition, Long> timestampsToSearch) {
+            final Map<TopicPartition, AutoOffsetResetStrategy> 
partitionAutoOffsetResetStrategyMap) {
+        Map<TopicPartition, Long> timestampsToSearch = 
partitionAutoOffsetResetStrategyMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().timestamp().get()));
         Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
                 groupListOffsetRequests(timestampsToSearch, Optional.empty());
 
@@ -677,8 +679,8 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
 
             partialResult.whenComplete((result, error) -> {
                 if (error == null) {
-                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
-                            result);
+                    
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
+                            partitionAutoOffsetResetStrategyMap);
                 } else {
                     RuntimeException e;
                     if (error instanceof RuntimeException) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 0df99301e9e..f700b8706ca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -441,7 +441,7 @@ public class SubscriptionState {
             log.debug("Skipping reset of partition {} since it is no longer 
assigned", tp);
         } else if (!state.awaitingReset()) {
             log.debug("Skipping reset of partition {} since reset is no longer 
needed", tp);
-        } else if (requestedResetStrategy != state.resetStrategy) {
+        } else if (requestedResetStrategy != null && 
!requestedResetStrategy.equals(state.resetStrategy)) {
             log.debug("Skipping reset of partition {} since an alternative 
reset has been requested", tp);
         } else {
             log.info("Resetting offset for partition {} to position {}.", tp, 
position);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e8a084ebd54..36d15c0fe94 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1082,7 +1082,20 @@ public class KafkaConsumerTest {
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
-        SubscriptionState subscription = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.LATEST);
+        setUpConsumerWithAutoResetPolicy(groupProtocol, 
AutoOffsetResetStrategy.LATEST);
+        assertEquals(50L, consumer.position(tp0));
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testResetUsingDurationBasedAutoResetPolicy(GroupProtocol 
groupProtocol) {
+        AutoOffsetResetStrategy durationStrategy = 
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+        setUpConsumerWithAutoResetPolicy(groupProtocol, durationStrategy);
+        assertEquals(50L, consumer.position(tp0));
+    }
+
+    private void setUpConsumerWithAutoResetPolicy(GroupProtocol groupProtocol, 
AutoOffsetResetStrategy strategy) {
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(), strategy);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1100,8 +1113,6 @@ public class KafkaConsumerTest {
         
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
 
         consumer.poll(Duration.ZERO);
-
-        assertEquals(50L, consumer.position(tp0));
     }
 
     @ParameterizedTest
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 41c9f199d15..21cee3183bc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -117,6 +117,28 @@ public class MockConsumerTest {
         assertEquals(11L, (long) 
consumer.endOffsets(Collections.singleton(partition)).get(partition));
     }
 
+    @Test
+    public void testDurationBasedOffsetReset() {
+        MockConsumer<String, String> consumer = new 
MockConsumer<>("by_duration:PT1H");
+        consumer.subscribe(Collections.singleton("test"));
+        consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new 
TopicPartition("test", 1)));
+        HashMap<TopicPartition, Long> durationBasedOffsets = new HashMap<>();
+        durationBasedOffsets.put(new TopicPartition("test", 0), 10L);
+        durationBasedOffsets.put(new TopicPartition("test", 1), 11L);
+        consumer.updateDurationOffsets(durationBasedOffsets);
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 
10L, 0L, TimestampType.CREATE_TIME,
+                0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 
11L, 0L, TimestampType.CREATE_TIME,
+                0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
+        consumer.addRecord(rec1);
+        consumer.addRecord(rec2);
+        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        Iterator<ConsumerRecord<String, String>> iter = records.iterator();
+        assertEquals(rec1, iter.next());
+        assertEquals(rec2, iter.next());
+        assertFalse(iter.hasNext());
+    }
+
     @Test
     public void testRebalanceListener() {
         final List<TopicPartition> revoked = new ArrayList<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
index 780319d610f..25ff9073747 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
@@ -17,9 +17,14 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -29,26 +34,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class AutoOffsetResetStrategyTest {
 
-    @Test
-    public void testIsValid() {
-        assertTrue(AutoOffsetResetStrategy.isValid("earliest"));
-        assertTrue(AutoOffsetResetStrategy.isValid("latest"));
-        assertTrue(AutoOffsetResetStrategy.isValid("none"));
-        assertFalse(AutoOffsetResetStrategy.isValid("invalid"));
-        assertFalse(AutoOffsetResetStrategy.isValid("LATEST"));
-        assertFalse(AutoOffsetResetStrategy.isValid(""));
-        assertFalse(AutoOffsetResetStrategy.isValid(null));
-    }
-
     @Test
     public void testFromString() {
         assertEquals(AutoOffsetResetStrategy.EARLIEST, 
AutoOffsetResetStrategy.fromString("earliest"));
         assertEquals(AutoOffsetResetStrategy.LATEST, 
AutoOffsetResetStrategy.fromString("latest"));
         assertEquals(AutoOffsetResetStrategy.NONE, 
AutoOffsetResetStrategy.fromString("none"));
         assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("invalid"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("by_duration:invalid"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("by_duration:-PT1H"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("by_duration:"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("by_duration"));
         assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("LATEST"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("EARLIEST"));
+        assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString("NONE"));
         assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString(""));
         assertThrows(IllegalArgumentException.class, () -> 
AutoOffsetResetStrategy.fromString(null));
+
+        AutoOffsetResetStrategy strategy = 
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+        assertEquals("by_duration", strategy.name());
     }
 
     @Test
@@ -57,21 +60,63 @@ public class AutoOffsetResetStrategyTest {
         assertDoesNotThrow(() -> validator.ensureValid("test", "earliest"));
         assertDoesNotThrow(() -> validator.ensureValid("test", "latest"));
         assertDoesNotThrow(() -> validator.ensureValid("test", "none"));
+        assertDoesNotThrow(() -> validator.ensureValid("test", 
"by_duration:PT1H"));
         assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "invalid"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "by_duration:invalid"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "by_duration:-PT1H"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "by_duration:"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "by_duration"));
         assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "LATEST"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "EARLIEST"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "NONE"));
         assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", ""));
         assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", null));
     }
 
     @Test
     public void testEqualsAndHashCode() {
-        AutoOffsetResetStrategy strategy1 = 
AutoOffsetResetStrategy.fromString("earliest");
-        AutoOffsetResetStrategy strategy2 = 
AutoOffsetResetStrategy.fromString("earliest");
-        AutoOffsetResetStrategy strategy3 = 
AutoOffsetResetStrategy.fromString("latest");
-
-        assertEquals(strategy1, strategy2);
-        assertNotEquals(strategy1, strategy3);
-        assertEquals(strategy1.hashCode(), strategy2.hashCode());
-        assertNotEquals(strategy1.hashCode(), strategy3.hashCode());
+        AutoOffsetResetStrategy earliest1 = 
AutoOffsetResetStrategy.fromString("earliest");
+        AutoOffsetResetStrategy earliest2 = 
AutoOffsetResetStrategy.fromString("earliest");
+        AutoOffsetResetStrategy latest1 = 
AutoOffsetResetStrategy.fromString("latest");
+
+        AutoOffsetResetStrategy duration1 = 
AutoOffsetResetStrategy.fromString("by_duration:P2D");
+        AutoOffsetResetStrategy duration2 = 
AutoOffsetResetStrategy.fromString("by_duration:P2D");
+
+        assertEquals(earliest1, earliest2);
+        assertNotEquals(earliest1, latest1);
+        assertEquals(earliest1.hashCode(), earliest2.hashCode());
+        assertNotEquals(earliest1.hashCode(), latest1.hashCode());
+
+        assertNotEquals(latest1, duration2);
+        assertEquals(duration1, duration2);
+    }
+
+    @Test
+    public void testTimestamp() {
+        AutoOffsetResetStrategy earliest1 = 
AutoOffsetResetStrategy.fromString("earliest");
+        AutoOffsetResetStrategy earliest2 = 
AutoOffsetResetStrategy.fromString("earliest");
+        assertEquals(Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP), 
earliest1.timestamp());
+        assertEquals(earliest1, earliest2);
+
+        AutoOffsetResetStrategy latest1 = 
AutoOffsetResetStrategy.fromString("latest");
+        AutoOffsetResetStrategy latest2 = 
AutoOffsetResetStrategy.fromString("latest");
+        assertEquals(Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP), 
latest1.timestamp());
+        assertEquals(latest1, latest2);
+
+        AutoOffsetResetStrategy none1 = 
AutoOffsetResetStrategy.fromString("none");
+        AutoOffsetResetStrategy none2 = 
AutoOffsetResetStrategy.fromString("none");
+        assertFalse(none1.timestamp().isPresent());
+        assertEquals(none1, none2);
+
+        AutoOffsetResetStrategy byDuration1 = 
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+        Optional<Long> timestamp = byDuration1.timestamp();
+        assertTrue(timestamp.isPresent());
+        assertTrue(timestamp.get() <= Instant.now().toEpochMilli() - 
Duration.ofHours(1).toMillis());
+
+        AutoOffsetResetStrategy byDuration2 = 
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+        AutoOffsetResetStrategy byDuration3 = 
AutoOffsetResetStrategy.fromString("by_duration:PT2H");
+
+        assertEquals(byDuration1, byDuration2);
+        assertNotEquals(byDuration1, byDuration3);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index d5ff67359c4..9a6c4d081bf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -65,6 +65,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -93,6 +94,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class OffsetFetcherTest {
 
@@ -186,6 +189,26 @@ public class OffsetFetcherTest {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testUpdateFetchPositionResetToDurationOffset() {
+        long timestamp = Instant.now().toEpochMilli();
+        AutoOffsetResetStrategy durationStrategy = 
mock(AutoOffsetResetStrategy.class);
+        when(durationStrategy.timestamp()).thenReturn(Optional.of(timestamp));
+        buildFetcher(durationStrategy);
+        assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, durationStrategy);
+
+        client.updateMetadata(initialUpdateResponse);
+
+        client.prepareResponse(listOffsetRequestMatcher(timestamp),
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        offsetFetcher.resetPositionsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).offset);
+    }
+
     /**
      * Make sure the client behaves appropriately when receiving an exception 
for unavailable offsets
      */


Reply via email to