This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ae3c5dec99b KAFKA-18013: Add support for duration based offset reset
strategy to Kafka Consumer (#17972)
ae3c5dec99b is described below
commit ae3c5dec99b6f20d860c945101d792c436301dc7
Author: Manikumar Reddy <[email protected]>
AuthorDate: Fri Nov 29 22:38:57 2024 +0530
KAFKA-18013: Add support for duration based offset reset strategy to Kafka
Consumer (#17972)
Update AutoOffsetResetStrategy.java to support duration based reset strategy
Update OffsetFetcher related classes and unit tests
Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans
<[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 2 +
.../kafka/clients/consumer/MockConsumer.java | 10 ++
.../internals/AutoOffsetResetStrategy.java | 106 ++++++++++++++++-----
.../clients/consumer/internals/OffsetFetcher.java | 13 ++-
.../consumer/internals/OffsetFetcherUtils.java | 39 +++-----
.../consumer/internals/OffsetsRequestManager.java | 18 ++--
.../consumer/internals/SubscriptionState.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 17 +++-
.../kafka/clients/consumer/MockConsumerTest.java | 22 +++++
.../internals/AutoOffsetResetStrategyTest.java | 83 ++++++++++++----
.../consumer/internals/OffsetFetcherTest.java | 23 +++++
11 files changed, 252 insertions(+), 83 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 779a9933fd7..fb4c05062bb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
"(e.g. because that data has been deleted): " +
"<ul><li>earliest: automatically reset the offset to the earliest
offset" +
"<li>latest: automatically reset the offset to the latest
offset</li>" +
+ "<li>by_duration:<duration>: automatically reset the offset to a
configured <duration> from the current timestamp. <duration> must be specified
in ISO8601 format (PnDTnHnMn.nS). " +
+ "Negative duration is not allowed.</li>" +
"<li>none: throw exception to the consumer if no previous offset
is found for the consumer's group</li>" +
"<li>anything else: throw exception to the consumer.</li></ul>" +
"<p>Note that altering partition numbers while setting this config
to latest may cause message delivery loss since " +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a900fac95aa..817e8a63b20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
+ private final Map<TopicPartition, Long> durationResetOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
@@ -104,6 +105,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.closed = false;
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
+ this.durationResetOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
@@ -433,6 +435,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
endOffsets.putAll(newOffsets);
}
+ public synchronized void updateDurationOffsets(final Map<TopicPartition,
Long> newOffsets) {
+ durationResetOffsets.putAll(newOffsets);
+ }
+
public void disableTelemetry() {
telemetryDisabled = true;
}
@@ -610,6 +616,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end
offset specified, but tried to seek to end");
+ } else if (strategy.type() ==
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+ offset = durationResetOffsets.get(tp);
+ if (offset == null)
+ throw new IllegalStateException("MockConsumer didn't have
duration offset specified, but tried to seek to timestamp");
} else {
throw new NoOffsetForPartitionException(tp);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
index a94eb4585a3..6eecc4e09d8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
@@ -18,15 +18,19 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Utils;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
+import java.util.Optional;
public class AutoOffsetResetStrategy {
- private enum StrategyType {
- LATEST, EARLIEST, NONE;
+ public enum StrategyType {
+ LATEST, EARLIEST, NONE, BY_DURATION;
@Override
public String toString() {
@@ -39,30 +43,65 @@ public class AutoOffsetResetStrategy {
public static final AutoOffsetResetStrategy NONE = new
AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
+ private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
this.type = type;
+ this.duration = Optional.empty();
}
- public static boolean isValid(String offsetStrategy) {
- return
Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
+ private AutoOffsetResetStrategy(Duration duration) {
+ this.type = StrategyType.BY_DURATION;
+ this.duration = Optional.of(duration);
}
+ /**
+ * Returns the AutoOffsetResetStrategy from the given string.
+ */
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
- if (offsetStrategy == null || !isValid(offsetStrategy)) {
- throw new IllegalArgumentException("Unknown auto offset reset
strategy: " + offsetStrategy);
+ if (offsetStrategy == null) {
+ throw new IllegalArgumentException("Auto offset reset strategy is
null");
}
- StrategyType type =
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
- switch (type) {
- case EARLIEST:
- return EARLIEST;
- case LATEST:
- return LATEST;
- case NONE:
- return NONE;
- default:
- throw new IllegalArgumentException("Unknown auto offset reset
strategy: " + offsetStrategy);
+
+ if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
+ throw new IllegalArgumentException("<:duration> part is missing in
by_duration auto offset reset strategy.");
+ }
+
+ if
(Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy))
{
+ StrategyType type =
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
+ switch (type) {
+ case EARLIEST:
+ return EARLIEST;
+ case LATEST:
+ return LATEST;
+ case NONE:
+ return NONE;
+ default:
+ throw new IllegalArgumentException("Unknown auto offset
reset strategy: " + offsetStrategy);
+ }
+ }
+
+ if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+ String isoDuration =
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+ try {
+ Duration duration = Duration.parse(isoDuration);
+ if (duration.isNegative()) {
+ throw new IllegalArgumentException("Negative duration is
not supported in by_duration offset reset strategy.");
+ }
+ return new AutoOffsetResetStrategy(duration);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to parse duration
string in by_duration offset reset strategy.", e);
+ }
}
+
+ throw new IllegalArgumentException("Unknown auto offset reset
strategy: " + offsetStrategy);
+ }
+
+ /**
+ * Returns the offset reset strategy type.
+ */
+ public StrategyType type() {
+ return type;
}
/**
@@ -72,33 +111,54 @@ public class AutoOffsetResetStrategy {
return type.toString();
}
+ /**
+ * Return the timestamp to be used for the ListOffsetsRequest.
+ * @return the timestamp for the OffsetResetStrategy,
+ * if the strategy is EARLIEST or LATEST or duration is provided
+ * else return Optional.empty()
+ */
+ public Optional<Long> timestamp() {
+ if (type == StrategyType.EARLIEST)
+ return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ else if (type == StrategyType.LATEST)
+ return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
+ else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
+ Instant now = Instant.now();
+ return Optional.of(now.minus(duration.get()).toEpochMilli());
+ } else
+ return Optional.empty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
- return Objects.equals(type, that.type);
+ return type == that.type && Objects.equals(duration, that.duration);
}
@Override
public int hashCode() {
- return Objects.hashCode(type);
+ return Objects.hash(type, duration);
}
@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
- "type='" + type + '\'' +
+ "type=" + type +
+ (duration.map(value -> ", duration=" + value).orElse("")) +
'}';
}
public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
- String strategy = (String) value;
- if (!AutoOffsetResetStrategy.isValid(strategy)) {
- throw new ConfigException(name, value, "Invalid value " +
strategy + " for configuration " +
- name + ": the value must be either 'earliest',
'latest', or 'none'.");
+ String offsetStrategy = (String) value;
+ try {
+ fromString(offsetStrategy);
+ } catch (Exception e) {
+ throw new ConfigException(name, value, "Invalid value `" +
offsetStrategy + "` for configuration " +
+ name + ". The value must be either 'earliest',
'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f7646bff9ed..bb01510e906 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -101,12 +101,13 @@ public class OffsetFetcher {
*
and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
*/
public void resetPositionsIfNeeded() {
- Map<TopicPartition, Long> offsetResetTimestamps =
offsetFetcherUtils.getOffsetResetTimestamp();
+ Map<TopicPartition, AutoOffsetResetStrategy>
partitionAutoOffsetResetStrategyMap =
+ offsetFetcherUtils.getOffsetResetStrategyForPartitions();
- if (offsetResetTimestamps.isEmpty())
+ if (partitionAutoOffsetResetStrategyMap.isEmpty())
return;
- resetPositionsAsync(offsetResetTimestamps);
+ resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
}
/**
@@ -209,7 +210,9 @@ public class OffsetFetcher {
}
}
- private void resetPositionsAsync(Map<TopicPartition, Long>
partitionResetTimestamps) {
+ private void resetPositionsAsync(Map<TopicPartition,
AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
+ Map<TopicPartition, Long> partitionResetTimestamps =
partitionAutoOffsetResetStrategyMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsPartition>>
timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new
HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry
: timestampsToSearchByNode.entrySet()) {
@@ -221,7 +224,7 @@ public class OffsetFetcher {
future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult result) {
-
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
partitionAutoOffsetResetStrategyMap);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 2e9d48ebda5..98afb02d6b9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.LogContext;
@@ -224,19 +223,22 @@ class OffsetFetcherUtils {
}
}
- Map<TopicPartition, Long> getOffsetResetTimestamp() {
+ /**
+ * get OffsetResetStrategy for all assigned partitions
+ */
+ Map<TopicPartition, AutoOffsetResetStrategy>
getOffsetResetStrategyForPartitions() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception =
cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;
Set<TopicPartition> partitions =
subscriptionState.partitionsNeedingReset(time.milliseconds());
- final Map<TopicPartition, Long> offsetResetTimestamps = new
HashMap<>();
+ final Map<TopicPartition, AutoOffsetResetStrategy>
partitionAutoOffsetResetStrategyMap = new HashMap<>();
for (final TopicPartition partition : partitions) {
- offsetResetTimestamps.put(partition,
offsetResetStrategyTimestamp(partition));
+ partitionAutoOffsetResetStrategyMap.put(partition,
offsetResetStrategyWithValidTimestamp(partition));
}
- return offsetResetTimestamps;
+ return partitionAutoOffsetResetStrategyMap;
}
static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
@@ -283,14 +285,13 @@ class OffsetFetcherUtils {
return offsetsResults;
}
- private long offsetResetStrategyTimestamp(final TopicPartition partition) {
+ private AutoOffsetResetStrategy
offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
AutoOffsetResetStrategy strategy =
subscriptionState.resetStrategy(partition);
- if (strategy == AutoOffsetResetStrategy.EARLIEST)
- return ListOffsetsRequest.EARLIEST_TIMESTAMP;
- else if (strategy == AutoOffsetResetStrategy.LATEST)
- return ListOffsetsRequest.LATEST_TIMESTAMP;
- else
+ if (strategy.timestamp().isPresent()) {
+ return strategy;
+ } else {
throw new NoOffsetForPartitionException(partition);
+ }
}
static Set<String> topicsForPartitions(Collection<TopicPartition>
partitions) {
@@ -319,18 +320,9 @@ class OffsetFetcherUtils {
}
}
- static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long
timestamp) {
- if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
- return AutoOffsetResetStrategy.EARLIEST;
- else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
- return AutoOffsetResetStrategy.LATEST;
- else
- return null;
- }
-
void onSuccessfulResponseForResettingPositions(
- final Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
- final ListOffsetResult result) {
+ final ListOffsetResult result,
+ final Map<TopicPartition, AutoOffsetResetStrategy>
partitionAutoOffsetResetStrategyMap) {
if (!result.partitionsToRetry.isEmpty()) {
subscriptionState.requestFailed(result.partitionsToRetry,
time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
@@ -339,10 +331,9 @@ class OffsetFetcherUtils {
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset :
result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
- ListOffsetsRequestData.ListOffsetsPartition requestedReset =
resetTimestamps.get(partition);
resetPositionIfNeeded(
partition,
- timestampToOffsetResetStrategy(requestedReset.timestamp()),
+ partitionAutoOffsetResetStrategyMap.get(partition),
offsetData);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index db847eae831..7870caec1ba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -472,20 +472,20 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
* this function (ex. {@link
org.apache.kafka.common.errors.TopicAuthorizationException})
*/
CompletableFuture<Void> resetPositionsIfNeeded() {
- Map<TopicPartition, Long> offsetResetTimestamps;
+ Map<TopicPartition, AutoOffsetResetStrategy>
partitionAutoOffsetResetStrategyMap;
try {
- offsetResetTimestamps =
offsetFetcherUtils.getOffsetResetTimestamp();
+ partitionAutoOffsetResetStrategyMap =
offsetFetcherUtils.getOffsetResetStrategyForPartitions();
} catch (Exception e) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(e);
return result;
}
- if (offsetResetTimestamps.isEmpty())
+ if (partitionAutoOffsetResetStrategyMap.isEmpty())
return CompletableFuture.completedFuture(null);
- return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
+ return
sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
}
/**
@@ -652,12 +652,14 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
* partitions. Use the retrieved offsets to reset positions in the
subscription state.
* This also adds the request to the list of unsentRequests.
*
- * @param timestampsToSearch the mapping between partitions and target time
+ * @param partitionAutoOffsetResetStrategyMap the mapping between
partitions and AutoOffsetResetStrategy
* @return A {@link CompletableFuture} which completes when the requests
are
* complete.
*/
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
- final Map<TopicPartition, Long> timestampsToSearch) {
+ final Map<TopicPartition, AutoOffsetResetStrategy>
partitionAutoOffsetResetStrategyMap) {
+ Map<TopicPartition, Long> timestampsToSearch =
partitionAutoOffsetResetStrategyMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.empty());
@@ -677,8 +679,8 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
partialResult.whenComplete((result, error) -> {
if (error == null) {
-
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
- result);
+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
+ partitionAutoOffsetResetStrategyMap);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 0df99301e9e..f700b8706ca 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -441,7 +441,7 @@ public class SubscriptionState {
log.debug("Skipping reset of partition {} since it is no longer
assigned", tp);
} else if (!state.awaitingReset()) {
log.debug("Skipping reset of partition {} since reset is no longer
needed", tp);
- } else if (requestedResetStrategy != state.resetStrategy) {
+ } else if (requestedResetStrategy != null &&
!requestedResetStrategy.equals(state.resetStrategy)) {
log.debug("Skipping reset of partition {} since an alternative
reset has been requested", tp);
} else {
log.info("Resetting offset for partition {} to position {}.", tp,
position);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e8a084ebd54..36d15c0fe94 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1082,7 +1082,20 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
- SubscriptionState subscription = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.LATEST);
+ setUpConsumerWithAutoResetPolicy(groupProtocol,
AutoOffsetResetStrategy.LATEST);
+ assertEquals(50L, consumer.position(tp0));
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testResetUsingDurationBasedAutoResetPolicy(GroupProtocol
groupProtocol) {
+ AutoOffsetResetStrategy durationStrategy =
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ setUpConsumerWithAutoResetPolicy(groupProtocol, durationStrategy);
+ assertEquals(50L, consumer.position(tp0));
+ }
+
+ private void setUpConsumerWithAutoResetPolicy(GroupProtocol groupProtocol,
AutoOffsetResetStrategy strategy) {
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(), strategy);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1100,8 +1113,6 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
consumer.poll(Duration.ZERO);
-
- assertEquals(50L, consumer.position(tp0));
}
@ParameterizedTest
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 41c9f199d15..21cee3183bc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -117,6 +117,28 @@ public class MockConsumerTest {
assertEquals(11L, (long)
consumer.endOffsets(Collections.singleton(partition)).get(partition));
}
+ @Test
+ public void testDurationBasedOffsetReset() {
+ MockConsumer<String, String> consumer = new
MockConsumer<>("by_duration:PT1H");
+ consumer.subscribe(Collections.singleton("test"));
+ consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new
TopicPartition("test", 1)));
+ HashMap<TopicPartition, Long> durationBasedOffsets = new HashMap<>();
+ durationBasedOffsets.put(new TopicPartition("test", 0), 10L);
+ durationBasedOffsets.put(new TopicPartition("test", 1), 11L);
+ consumer.updateDurationOffsets(durationBasedOffsets);
+ ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0,
10L, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+ ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0,
11L, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
+ consumer.addRecord(rec1);
+ consumer.addRecord(rec2);
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1));
+ Iterator<ConsumerRecord<String, String>> iter = records.iterator();
+ assertEquals(rec1, iter.next());
+ assertEquals(rec2, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
@Test
public void testRebalanceListener() {
final List<TopicPartition> revoked = new ArrayList<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
index 780319d610f..25ff9073747 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java
@@ -17,9 +17,14 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -29,26 +34,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AutoOffsetResetStrategyTest {
- @Test
- public void testIsValid() {
- assertTrue(AutoOffsetResetStrategy.isValid("earliest"));
- assertTrue(AutoOffsetResetStrategy.isValid("latest"));
- assertTrue(AutoOffsetResetStrategy.isValid("none"));
- assertFalse(AutoOffsetResetStrategy.isValid("invalid"));
- assertFalse(AutoOffsetResetStrategy.isValid("LATEST"));
- assertFalse(AutoOffsetResetStrategy.isValid(""));
- assertFalse(AutoOffsetResetStrategy.isValid(null));
- }
-
@Test
public void testFromString() {
assertEquals(AutoOffsetResetStrategy.EARLIEST,
AutoOffsetResetStrategy.fromString("earliest"));
assertEquals(AutoOffsetResetStrategy.LATEST,
AutoOffsetResetStrategy.fromString("latest"));
assertEquals(AutoOffsetResetStrategy.NONE,
AutoOffsetResetStrategy.fromString("none"));
assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("invalid"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("by_duration:invalid"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("by_duration:-PT1H"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("by_duration:"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("by_duration"));
assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("LATEST"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("EARLIEST"));
+ assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString("NONE"));
assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString(""));
assertThrows(IllegalArgumentException.class, () ->
AutoOffsetResetStrategy.fromString(null));
+
+ AutoOffsetResetStrategy strategy =
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ assertEquals("by_duration", strategy.name());
}
@Test
@@ -57,21 +60,63 @@ public class AutoOffsetResetStrategyTest {
assertDoesNotThrow(() -> validator.ensureValid("test", "earliest"));
assertDoesNotThrow(() -> validator.ensureValid("test", "latest"));
assertDoesNotThrow(() -> validator.ensureValid("test", "none"));
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"by_duration:PT1H"));
assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "invalid"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:invalid"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:-PT1H"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration"));
assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "LATEST"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "EARLIEST"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "NONE"));
assertThrows(ConfigException.class, () ->
validator.ensureValid("test", ""));
assertThrows(ConfigException.class, () ->
validator.ensureValid("test", null));
}
@Test
public void testEqualsAndHashCode() {
- AutoOffsetResetStrategy strategy1 =
AutoOffsetResetStrategy.fromString("earliest");
- AutoOffsetResetStrategy strategy2 =
AutoOffsetResetStrategy.fromString("earliest");
- AutoOffsetResetStrategy strategy3 =
AutoOffsetResetStrategy.fromString("latest");
-
- assertEquals(strategy1, strategy2);
- assertNotEquals(strategy1, strategy3);
- assertEquals(strategy1.hashCode(), strategy2.hashCode());
- assertNotEquals(strategy1.hashCode(), strategy3.hashCode());
+ AutoOffsetResetStrategy earliest1 =
AutoOffsetResetStrategy.fromString("earliest");
+ AutoOffsetResetStrategy earliest2 =
AutoOffsetResetStrategy.fromString("earliest");
+ AutoOffsetResetStrategy latest1 =
AutoOffsetResetStrategy.fromString("latest");
+
+ AutoOffsetResetStrategy duration1 =
AutoOffsetResetStrategy.fromString("by_duration:P2D");
+ AutoOffsetResetStrategy duration2 =
AutoOffsetResetStrategy.fromString("by_duration:P2D");
+
+ assertEquals(earliest1, earliest2);
+ assertNotEquals(earliest1, latest1);
+ assertEquals(earliest1.hashCode(), earliest2.hashCode());
+ assertNotEquals(earliest1.hashCode(), latest1.hashCode());
+
+ assertNotEquals(latest1, duration2);
+ assertEquals(duration1, duration2);
+ }
+
+ @Test
+ public void testTimestamp() {
+ AutoOffsetResetStrategy earliest1 =
AutoOffsetResetStrategy.fromString("earliest");
+ AutoOffsetResetStrategy earliest2 =
AutoOffsetResetStrategy.fromString("earliest");
+ assertEquals(Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP),
earliest1.timestamp());
+ assertEquals(earliest1, earliest2);
+
+ AutoOffsetResetStrategy latest1 =
AutoOffsetResetStrategy.fromString("latest");
+ AutoOffsetResetStrategy latest2 =
AutoOffsetResetStrategy.fromString("latest");
+ assertEquals(Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP),
latest1.timestamp());
+ assertEquals(latest1, latest2);
+
+ AutoOffsetResetStrategy none1 =
AutoOffsetResetStrategy.fromString("none");
+ AutoOffsetResetStrategy none2 =
AutoOffsetResetStrategy.fromString("none");
+ assertFalse(none1.timestamp().isPresent());
+ assertEquals(none1, none2);
+
+ AutoOffsetResetStrategy byDuration1 =
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ Optional<Long> timestamp = byDuration1.timestamp();
+ assertTrue(timestamp.isPresent());
+ assertTrue(timestamp.get() <= Instant.now().toEpochMilli() -
Duration.ofHours(1).toMillis());
+
+ AutoOffsetResetStrategy byDuration2 =
AutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ AutoOffsetResetStrategy byDuration3 =
AutoOffsetResetStrategy.fromString("by_duration:PT2H");
+
+ assertEquals(byDuration1, byDuration2);
+ assertNotEquals(byDuration1, byDuration3);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index d5ff67359c4..9a6c4d081bf 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -65,6 +65,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -93,6 +94,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OffsetFetcherTest {
@@ -186,6 +189,26 @@ public class OffsetFetcherTest {
assertEquals(5, subscriptions.position(tp0).offset);
}
+ @Test
+ public void testUpdateFetchPositionResetToDurationOffset() {
+ long timestamp = Instant.now().toEpochMilli();
+ AutoOffsetResetStrategy durationStrategy =
mock(AutoOffsetResetStrategy.class);
+ when(durationStrategy.timestamp()).thenReturn(Optional.of(timestamp));
+ buildFetcher(durationStrategy);
+ assignFromUser(singleton(tp0));
+ subscriptions.requestOffsetReset(tp0, durationStrategy);
+
+ client.updateMetadata(initialUpdateResponse);
+
+ client.prepareResponse(listOffsetRequestMatcher(timestamp),
+ listOffsetResponse(Errors.NONE, 1L, 5L));
+ offsetFetcher.resetPositionsIfNeeded();
+ consumerClient.pollNoWakeup();
+ assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+ assertTrue(subscriptions.isFetchable(tp0));
+ assertEquals(5, subscriptions.position(tp0).offset);
+ }
+
/**
* Make sure the client behaves appropriately when receiving an exception
for unavailable offsets
*/