This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 220c5785210 KAFKA-18014: Add duration based offset reset option for
ShareConsumer (#18096)
220c5785210 is described below
commit 220c57852100e895b460e56d1132a3c9239aaa59
Author: Peter Lee <[email protected]>
AuthorDate: Mon Dec 16 16:25:37 2024 +0800
KAFKA-18014: Add duration based offset reset option for ShareConsumer
(#18096)
Kafka consumer supports auto.offset.reset config option, which is used when
there is no initial offset in Kafka (or) if the current offset does not exist
any more on the server. This config currently supports earliest/latest/none
options. Currently consumer resets might force applications to reprocess large
amounts of data from earlier offsets. With infinite storage, its beneficial to
have a duration based offset reset strategy. This will allow applications to
consume/initialise from [...]
As part of KIP-932, we are adding support for share consumer groups. Share
consumer groups supports dynamic group configuration property
share.auto.offset.reset. This is used to set the initial Share-Partition Start
Offset (SPSO) based on the share.auto.offset.reset configuration. Currently
share.auto.offset.reset supports earliest and latest options to automatically
reset the offset
Similar to the Kafka Consumer, we will add support for by_duration: config
value for share.auto.offset.reset.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../java/kafka/server/share/ShareFetchUtils.java | 14 ++
.../java/kafka/server/share/SharePartition.java | 13 +-
.../kafka/server/share/SharePartitionTest.java | 118 ++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../kafka/coordinator/group/GroupConfig.java | 32 ++---
.../group/ShareGroupAutoOffsetResetStrategy.java | 159 +++++++++++++++++++++
.../kafka/coordinator/group/GroupConfigTest.java | 16 +++
.../ShareGroupAutoOffsetResetStrategyTest.java | 112 +++++++++++++++
8 files changed, 440 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 8ed1623e7ef..0353b079e80 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -161,6 +161,20 @@ public class ShareFetchUtils {
return timestampAndOffset.get().offset;
}
+ /**
+ * The method is used to get the offset for the given timestamp for the
topic-partition.
+ *
+ * @return The offset for the given timestamp.
+ */
+ static long offsetForTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(), timestampToSearch, new
Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch),
true).timestampAndOffsetOpt();
+ if (timestampAndOffset.isEmpty()) {
+ throw new OffsetNotAvailableException("Offset for timestamp " +
timestampToSearch + " not found for topic partition: " + topicIdPartition);
+ }
+ return timestampAndOffset.get().offset;
+ }
+
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
return partition(replicaManager, tp).getLeaderEpoch();
}
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 3dee1e35f3e..1746cbc9e3e 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
@@ -76,6 +77,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
+import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
/**
* The SharePartition is used to track the state of a partition that is shared
between multiple
@@ -2093,16 +2095,21 @@ public class SharePartition {
if (partitionDataStartOffset !=
PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
}
- GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
+ ShareGroupAutoOffsetResetStrategy offsetResetStrategy;
if (groupConfigManager.groupConfig(groupId).isPresent()) {
offsetResetStrategy =
groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
} else {
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
}
- if (offsetResetStrategy ==
GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
+ if (offsetResetStrategy.type() ==
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
+ return offsetForLatestTimestamp(topicIdPartition, replicaManager,
leaderEpoch);
+ } else if (offsetResetStrategy.type() ==
ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
return offsetForEarliestTimestamp(topicIdPartition,
replicaManager, leaderEpoch);
- return offsetForLatestTimestamp(topicIdPartition, replicaManager,
leaderEpoch);
+ } else {
+ // offsetResetStrategy type is BY_DURATION
+ return offsetForTimestamp(topicIdPartition, replicaManager,
offsetResetStrategy.timestamp(), leaderEpoch);
+ }
}
// Visible for testing. Should only be used for testing purposes.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 8d1bbc28232..383ffe0b45c 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
@@ -215,7 +216,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -265,7 +266,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -298,6 +299,64 @@ public class SharePartitionTest {
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH,
sharePartition.stateEpoch());
}
+ @Test
+ public void
testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+ // Since the timestamp() of duration based strategy is not
deterministic, we need to mock the ShareGroupAutoOffsetResetStrategy.
+ // mock: final ShareGroupAutoOffsetResetStrategy resetStrategy =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ final ShareGroupAutoOffsetResetStrategy resetStrategy =
Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
+ final long expectedTimestamp = MOCK_TIME.milliseconds() -
TimeUnit.HOURS.toMillis(1);
+
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
+ Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
+
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() -
TimeUnit.HOURS.toMillis(1), 15L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with the
(current time - 1 hour)
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(expectedTimestamp),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(15, sharePartition.startOffset());
+ assertEquals(15, sharePartition.endOffset());
+ assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH,
sharePartition.stateEpoch());
+ }
+
@Test
public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
Persister persister = Mockito.mock(Persister.class);
@@ -407,7 +466,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -436,6 +495,59 @@ public class SharePartitionTest {
assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
}
+ @Test
+ public void testMaybeInitializeFetchOffsetForByDurationThrowsError() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+ // We need to mock the ShareGroupAutoOffsetResetStrategy as the
timestamp() of duration based strategy is not deterministic.
+ // final ShareGroupAutoOffsetResetStrategy resetStrategy =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ final ShareGroupAutoOffsetResetStrategy resetStrategy =
Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
+ final long expectedTimestamp = MOCK_TIME.milliseconds() -
TimeUnit.HOURS.toMillis(1);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+
+
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
+ Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
+ .thenThrow(new RuntimeException("fetch offsets exception"));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(expectedTimestamp),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ }
+
@Test
public void testMaybeInitializeSharePartitionAgain() {
Persister persister = Mockito.mock(Persister.class);
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a2ca922226c..d2f08c880eb 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -585,7 +585,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
- cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.defaultShareAutoOffsetReset.toString)
+ cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new
DescribeConfigsRequestData()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 934055d9d5b..03f0af738d2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -21,10 +21,8 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -34,7 +32,6 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Group configuration related parameters and supporting methods like
validation, etc. are
@@ -53,8 +50,14 @@ public final class GroupConfig extends AbstractConfig {
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG =
"share.record.lock.duration.ms";
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG =
"share.auto.offset.reset";
- public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT =
ShareGroupAutoOffsetReset.LATEST.toString();
- public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to
initialize the share-partition start offset.";
+ public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT =
ShareGroupAutoOffsetResetStrategy.LATEST.name();
+ public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to
initialize the share-partition start offset. " +
+ "<ul><li>earliest: automatically reset the offset to the earliest
offset</li>" +
+ "<li>latest: automatically reset the offset to the latest offset</li>"
+
+ "<li>by_duration:<duration>: automatically reset the offset to a
configured duration from the current timestamp. " +
+ "<duration> must be specified in ISO8601 format (PnDTnHnMn.nS).
" +
+ "Negative duration is not allowed.</li>" +
+ "<li>anything else: throw exception to the share consumer.</li></ul>";
public final int consumerSessionTimeoutMs;
@@ -102,7 +105,7 @@ public final class GroupConfig extends AbstractConfig {
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
STRING,
SHARE_AUTO_OFFSET_RESET_DEFAULT,
- in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
+ new ShareGroupAutoOffsetResetStrategy.Validator(),
MEDIUM,
SHARE_AUTO_OFFSET_RESET_DOC);
@@ -223,8 +226,8 @@ public final class GroupConfig extends AbstractConfig {
/**
* The default share group auto offset reset strategy.
*/
- public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
- return
ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
+ public static ShareGroupAutoOffsetResetStrategy
defaultShareAutoOffsetReset() {
+ return
ShareGroupAutoOffsetResetStrategy.fromString(SHARE_AUTO_OFFSET_RESET_DEFAULT);
}
/**
@@ -265,16 +268,7 @@ public final class GroupConfig extends AbstractConfig {
/**
* The share group auto offset reset strategy.
*/
- public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
- return
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
- }
-
- public enum ShareGroupAutoOffsetReset {
- LATEST, EARLIEST;
-
- @Override
- public String toString() {
- return super.toString().toLowerCase(Locale.ROOT);
- }
+ public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
+ return
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java
new file mode 100644
index 00000000000..fc2912300d1
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Represents the strategy for resetting offsets in share consumer groups when
no previous offset is found
+ * for a partition or when an offset is out of range.
+ *
+ * Supports three strategies:
+ * <ul>
+ * <li>{@code EARLIEST} - Reset the offset to the earliest available offset
+ * <li>{@code LATEST} - Reset the offset to the latest available offset
+ * <li>{@code BY_DURATION} - Reset the offset to a timestamp that is the
specified duration before the current time
+ * </ul>
+ */
+public class ShareGroupAutoOffsetResetStrategy {
+
+ public static final ShareGroupAutoOffsetResetStrategy EARLIEST = new
ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.EARLIEST,
StrategyType.EARLIEST);
+ public static final ShareGroupAutoOffsetResetStrategy LATEST = new
ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.LATEST,
StrategyType.LATEST);
+
+ public enum StrategyType {
+ LATEST, EARLIEST, BY_DURATION;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ private final AutoOffsetResetStrategy delegate;
+ private final StrategyType type;
+
+ private ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy
delegate, StrategyType type) {
+ this.delegate = delegate;
+ this.type = type;
+ }
+
+ /**
+ * Factory method to create a ShareGroupAutoOffsetResetStrategy from a
string representation.
+ */
+ public static ShareGroupAutoOffsetResetStrategy fromString(String
offsetStrategy) {
+ AutoOffsetResetStrategy baseStrategy =
AutoOffsetResetStrategy.fromString(offsetStrategy);
+ AutoOffsetResetStrategy.StrategyType baseType = baseStrategy.type();
+
+ StrategyType shareGroupType;
+ switch (baseType) {
+ case EARLIEST:
+ shareGroupType = StrategyType.EARLIEST;
+ break;
+ case LATEST:
+ shareGroupType = StrategyType.LATEST;
+ break;
+ case BY_DURATION:
+ shareGroupType = StrategyType.BY_DURATION;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported strategy for
ShareGroup: " + baseType);
+ }
+
+ return new ShareGroupAutoOffsetResetStrategy(baseStrategy,
shareGroupType);
+ }
+
+ /**
+ * Returns the share group strategy type.
+ */
+ public StrategyType type() {
+ return type;
+ }
+
+ /**
+ * Returns the name of the share group offset reset strategy.
+ */
+ public String name() {
+ return type.toString();
+ }
+
+ /**
+ * Delegates the timestamp calculation to the base strategy.
+ * @return the timestamp for the OffsetResetStrategy,
+ * if the strategy is EARLIEST or LATEST or duration is provided
+ * else return Optional.empty()
+ */
+ public Long timestamp() {
+ return delegate.timestamp().get();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ShareGroupAutoOffsetResetStrategy that =
(ShareGroupAutoOffsetResetStrategy) o;
+ return type == that.type && Objects.equals(delegate, that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delegate, type);
+ }
+
+ @Override
+ public String toString() {
+ return "ShareGroupAutoOffsetResetStrategy{" +
+ "type=" + type +
+ ", delegate=" + delegate +
+ '}';
+ }
+
+ /**
+ * Factory method for creating EARLIEST strategy.
+ */
+ public static ShareGroupAutoOffsetResetStrategy earliest() {
+ return new
ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.EARLIEST,
StrategyType.EARLIEST);
+ }
+
+ /**
+ * Factory method for creating LATEST strategy.
+ */
+ public static ShareGroupAutoOffsetResetStrategy latest() {
+ return new
ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.LATEST,
StrategyType.LATEST);
+ }
+
+ public static class Validator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ String offsetStrategy = (String) value;
+ try {
+ fromString(offsetStrategy);
+ } catch (Exception e) {
+ throw new ConfigException(name, value, "Invalid value `" +
offsetStrategy + "` for configuration " +
+ name + ". The value must be either 'earliest',
'latest' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
+ }
+ }
+
+ public String toString() {
+ return "[earliest, latest, by_duration:PnDTnHnMn.nS]";
+ }
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index fe11f50d2ff..b774d29bb6a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -88,6 +88,10 @@ public class GroupConfigTest {
// Check for value "earliest"
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
doTestValidProps(props);
+
+ // Check for value "by_duration"
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
"by_duration:PT10S");
+ doTestValidProps(props);
}
@Test
@@ -148,6 +152,18 @@ public class GroupConfigTest {
// Check for invalid shareAutoOffsetReset
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
doTestInvalidProps(props, ConfigException.class);
+
+ // Check for invalid shareAutoOffsetReset, by_duration without duration
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration");
+ doTestInvalidProps(props, ConfigException.class);
+
+ // Check for invalid shareAutoOffsetReset, by_duration with negative
duration
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
"by_duration:-PT10S");
+ doTestInvalidProps(props, ConfigException.class);
+
+ // Check for invalid shareAutoOffsetReset, by_duration with invalid
duration
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
"by_duration:invalid");
+ doTestInvalidProps(props, ConfigException.class);
}
private void doTestInvalidProps(Properties props, Class<? extends
Exception> exceptionClassName) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategyTest.java
new file mode 100644
index 00000000000..b0523a5fb9f
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategyTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShareGroupAutoOffsetResetStrategyTest {
+
+ @Test
+ public void testFromString() {
+ assertEquals(ShareGroupAutoOffsetResetStrategy.EARLIEST,
ShareGroupAutoOffsetResetStrategy.fromString("earliest"));
+ assertEquals(ShareGroupAutoOffsetResetStrategy.LATEST,
ShareGroupAutoOffsetResetStrategy.fromString("latest"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("invalid"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:invalid"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:-PT1H"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("by_duration"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("LATEST"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("EARLIEST"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString("NONE"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString(""));
+ assertThrows(IllegalArgumentException.class, () ->
ShareGroupAutoOffsetResetStrategy.fromString(null));
+
+ ShareGroupAutoOffsetResetStrategy strategy =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ assertEquals("by_duration", strategy.name());
+ }
+
+ @Test
+ public void testValidator() {
+ ShareGroupAutoOffsetResetStrategy.Validator validator = new
ShareGroupAutoOffsetResetStrategy.Validator();
+ assertDoesNotThrow(() -> validator.ensureValid("test", "earliest"));
+ assertDoesNotThrow(() -> validator.ensureValid("test", "latest"));
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"by_duration:PT1H"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "invalid"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:invalid"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:-PT1H"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration:"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "by_duration"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "LATEST"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "EARLIEST"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "NONE"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", ""));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", null));
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ ShareGroupAutoOffsetResetStrategy earliest1 =
ShareGroupAutoOffsetResetStrategy.fromString("earliest");
+ ShareGroupAutoOffsetResetStrategy earliest2 =
ShareGroupAutoOffsetResetStrategy.fromString("earliest");
+ ShareGroupAutoOffsetResetStrategy latest1 =
ShareGroupAutoOffsetResetStrategy.fromString("latest");
+
+ ShareGroupAutoOffsetResetStrategy duration1 =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:P2D");
+ ShareGroupAutoOffsetResetStrategy duration2 =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:P2D");
+
+ assertEquals(earliest1, earliest2);
+ assertNotEquals(earliest1, latest1);
+ assertEquals(earliest1.hashCode(), earliest2.hashCode());
+ assertNotEquals(earliest1.hashCode(), latest1.hashCode());
+
+ assertNotEquals(latest1, duration2);
+ assertEquals(duration1, duration2);
+ }
+
+ @Test
+ public void testTimestamp() {
+ ShareGroupAutoOffsetResetStrategy earliest1 =
ShareGroupAutoOffsetResetStrategy.fromString("earliest");
+ ShareGroupAutoOffsetResetStrategy earliest2 =
ShareGroupAutoOffsetResetStrategy.fromString("earliest");
+ assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP,
earliest1.timestamp());
+ assertEquals(earliest1, earliest2);
+
+ ShareGroupAutoOffsetResetStrategy latest1 =
ShareGroupAutoOffsetResetStrategy.fromString("latest");
+ ShareGroupAutoOffsetResetStrategy latest2 =
ShareGroupAutoOffsetResetStrategy.fromString("latest");
+ assertEquals(ListOffsetsRequest.LATEST_TIMESTAMP, latest1.timestamp());
+ assertEquals(latest1, latest2);
+
+ ShareGroupAutoOffsetResetStrategy byDuration1 =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ Long timestamp = byDuration1.timestamp();
+ assertTrue(timestamp <= Instant.now().toEpochMilli() -
Duration.ofHours(1).toMillis());
+
+ ShareGroupAutoOffsetResetStrategy byDuration2 =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
+ ShareGroupAutoOffsetResetStrategy byDuration3 =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT2H");
+
+ assertEquals(byDuration1, byDuration2);
+ assertNotEquals(byDuration1, byDuration3);
+ }
+}