This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6cdb8c352a8 KAFKA-18015: add byDuration auto.offset.reset to Kafka 
Streams (#18115)
6cdb8c352a8 is described below

commit 6cdb8c352a8ae4e77e30f486390970aecb1d970a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 11 15:12:16 2024 -0800

    KAFKA-18015: add byDuration auto.offset.reset to Kafka Streams (#18115)
    
    Part of KIP-1106.
    
    Adds support for "by_duration" and "none" reset strategy
    to the Kafka Streams runtime.
    
    Reviewers: Bill Bejeck <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>
---
 .../internals/AutoOffsetResetStrategy.java         |   4 +
 gradle/spotbugs-exclude.xml                        |   6 +
 .../FineGrainedAutoResetIntegrationTest.java       | 154 ++++++++++++++++++---
 .../org/apache/kafka/streams/StreamsBuilder.java   |   2 +-
 .../java/org/apache/kafka/streams/Topology.java    |  63 +++++----
 .../streams/internals/AutoOffsetResetInternal.java |   9 +-
 .../org/apache/kafka/streams/kstream/Consumed.java |   4 -
 .../kstream/internals/ConsumedInternal.java        |  20 +--
 .../kstream/internals/graph/StreamSourceNode.java  |   6 +-
 .../kstream/internals/graph/TableSourceNode.java   |   3 +-
 .../internals/InternalTopologyBuilder.java         |  79 ++++++++---
 .../streams/processor/internals/StreamThread.java  | 144 +++++++++++++++++--
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/TopologyMetadata.java      |   7 +-
 .../apache/kafka/streams/AutoOffsetResetTest.java  |  20 ++-
 .../internals/InternalStreamsBuilderTest.java      |  71 ++++++++--
 .../internals/InternalTopologyBuilderTest.java     |  36 +++--
 .../processor/internals/StreamThreadTest.java      |   5 +-
 .../internals/StreamsPartitionAssignorTest.java    |   5 +-
 19 files changed, 488 insertions(+), 152 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
index 6eecc4e09d8..a692bc45ea7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
@@ -129,6 +129,10 @@ public class AutoOffsetResetStrategy {
             return Optional.empty();
     }
 
+    public Optional<Duration> duration() {
+        return duration;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 8b6ca9f522c..3f0f9efd165 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -271,6 +271,12 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="MS_MUTABLE_ARRAY"/>
     </Match>
 
+    <Match>
+        <Class 
name="org.apache.kafka.streams.processor.internals.TopologyMetadata"/>
+        <Method name="offsetResetStrategy"/>
+        <Bug pattern="NP_OPTIONAL_RETURN_NULL"/>
+    </Match>
+
     <Match>
         <!-- Suppress warnings about ignoring the return value of await.
              This is done intentionally because we use other clues to determine
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 413686c2ea5..3ea17e1a098 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.streams.AutoOffsetReset;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -57,6 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -70,6 +72,9 @@ public class FineGrainedAutoResetIntegrationTest {
     private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
     private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
     private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
+    private static final String OUTPUT_TOPIC_3 = "outputTopic_3";
+    private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
+    private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
@@ -95,6 +100,9 @@ public class FineGrainedAutoResetIntegrationTest {
                 TOPIC_C_2,
                 TOPIC_Y_2,
                 TOPIC_Z_2,
+                TOPIC_DURATION_1,
+                TOPIC_DURATION_1,
+                TOPIC_DURATION_3,
                 NOOP,
                 DEFAULT_OUTPUT_TOPIC,
                 OUTPUT_TOPIC_0,
@@ -127,10 +135,13 @@ public class FineGrainedAutoResetIntegrationTest {
     private static final String TOPIC_C_2 = "topic-C_2";
     private static final String TOPIC_Y_2 = "topic-Y_2";
     private static final String TOPIC_Z_2 = "topic-Z_2";
+    private static final String TOPIC_DURATION_1 = "durationTopic-1";
+    private static final String TOPIC_DURATION_2 = "durationTopic-2";
+    private static final String TOPIC_DURATION_3 = "durationTopic-3";
     private static final String NOOP = "noop";
     private final Serde<String> stringSerde = Serdes.String();
 
-    private static final String STRING_SERDE_CLASSNAME = 
Serdes.String().getClass().getName();
+    private static final String STRING_SERDE_CLASSNAME = 
Serdes.StringSerde.class.getName();
     private Properties streamsConfiguration;
 
     private final String topic1TestMessage = "topic-1 test";
@@ -141,7 +152,7 @@ public class FineGrainedAutoResetIntegrationTest {
     private final String topicZTestMessage = "topic-Z test";
 
     @BeforeEach
-    public void setUp() throws IOException {
+    public void setUp() throws Exception {
 
         final Properties props = new Properties();
         props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -196,8 +207,8 @@ public class FineGrainedAutoResetIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
 
-        final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("topic-\\d" + topicSuffix), 
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
-        final KStream<String, String> pattern2Stream = 
builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), 
Consumed.with(Topology.AutoOffsetReset.LATEST));
+        final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("topic-\\d" + topicSuffix), 
Consumed.with(AutoOffsetReset.earliest()));
+        final KStream<String, String> pattern2Stream = 
builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), 
Consumed.with(AutoOffsetReset.latest()));
         final KStream<String, String> namedTopicsStream = 
builder.stream(Arrays.asList(topicY, topicZ));
 
         pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
@@ -215,17 +226,18 @@ public class FineGrainedAutoResetIntegrationTest {
 
         final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
StringDeserializer.class);
 
-        final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration);
-        streams.start();
-
-        final List<KeyValue<String, String>> receivedKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
outputTopic, expectedReceivedValues.size());
         final List<String> actualValues = new 
ArrayList<>(expectedReceivedValues.size());
 
-        for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValues) {
-            actualValues.add(receivedKeyValue.value);
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration)) {
+            streams.start();
+
+            final List<KeyValue<String, String>> receivedKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
outputTopic, expectedReceivedValues.size());
+
+            for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValues) {
+                actualValues.add(receivedKeyValue.value);
+            }
         }
 
-        streams.close();
         Collections.sort(actualValues);
         Collections.sort(expectedReceivedValues);
         assertThat(actualValues, equalTo(expectedReceivedValues));
@@ -251,14 +263,108 @@ public class FineGrainedAutoResetIntegrationTest {
         consumer.close();
     }
 
+    @Test
+    public void shouldFailForResetNone() throws Exception {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+
+        final Properties localConfig = StreamsTestUtils.getStreamsConfig(
+            "testConfigAutoOffsetWithNone",
+            CLUSTER.bootstrapServers(),
+            STRING_SERDE_CLASSNAME,
+            STRING_SERDE_CLASSNAME,
+            props);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> exceptionStream = builder.stream(NOOP, 
Consumed.with(AutoOffsetReset.none()));
+
+        exceptionStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
+
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
localConfig)) {
+            final TestingUncaughtExceptionHandler uncaughtExceptionHandler = 
new TestingUncaughtExceptionHandler();
+            streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+
+            streams.start();
+
+            waitForCondition(
+                () -> uncaughtExceptionHandler.correctExceptionThrown,
+                "The expected NoOffsetForPartitionException was never thrown"
+            );
+        }
+    }
+
+    @Test
+    public void shouldResetByDuration() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(TOPIC_DURATION_1, 
Consumed.with(AutoOffsetReset.byDuration(Duration.ofHours(6L).plus(Duration.ofMinutes(40L)))))
+            .to(OUTPUT_TOPIC_3, Produced.with(stringSerde, stringSerde));
+        builder.<String, String>stream(TOPIC_DURATION_2, 
Consumed.with(AutoOffsetReset.byDuration(Duration.ofMillis(mockTime.milliseconds()).minus(Duration.ofDays(1L)))))
+            .to(OUTPUT_TOPIC_4, Produced.with(stringSerde, stringSerde));
+        builder.<String, String>stream(TOPIC_DURATION_3, 
Consumed.with(AutoOffsetReset.byDuration(Duration.ZERO)))
+            .to(OUTPUT_TOPIC_5, Produced.with(stringSerde, stringSerde));
+
+        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
StringSerializer.class);
+
+        for (int i = 0; i < 10; ++i) {
+            mockTime.sleep(Duration.ofHours(1L).toMillis());
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_1, 
Collections.singletonList("" + i), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_2, 
Collections.singletonList("" + i), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_3, 
Collections.singletonList("" + i), producerConfig, mockTime);
+        }
+
+        final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
StringDeserializer.class);
+
+        final List<String> expectedValues = Arrays.asList("3", "4", "5", "6", 
"7", "8", "9");
+        final List<String> allExpectedValues = Arrays.asList("0", "1", "2", 
"3", "4", "5", "6", "7", "8", "9");
+        final List<String> singleFinalExpectedValues = List.of("10");
+        final List<String> actualValuesOne = new 
ArrayList<>(expectedValues.size());
+        final List<String> actualValuesTwo = new 
ArrayList<>(allExpectedValues.size());
+        final List<String> actualValuesThree = new 
ArrayList<>(singleFinalExpectedValues.size());
+
+        final MockTime streamsMockTime = new MockTime(mockTime.milliseconds() 
+ Duration.ofMinutes(20).toMillis(), 0);
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration, streamsMockTime)) {
+            streams.start();
+
+            final List<KeyValue<String, String>> receivedKeyValuesOne =
+                
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_TOPIC_3, expectedValues.size());
+            for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValuesOne) {
+                actualValuesOne.add(receivedKeyValue.value);
+            }
+
+            final List<KeyValue<String, String>> receivedKeyValuesTwo =
+                
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_TOPIC_4, allExpectedValues.size());
+            for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValuesTwo) {
+                actualValuesTwo.add(receivedKeyValue.value);
+            }
+
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_3, 
Collections.singletonList("10"), producerConfig, mockTime);
+            final List<KeyValue<String, String>> receivedKeyValuesThree =
+                
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_TOPIC_5, singleFinalExpectedValues.size());
+            for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValuesThree) {
+                actualValuesThree.add(receivedKeyValue.value);
+            }
+        }
+
+        Collections.sort(actualValuesOne);
+        Collections.sort(expectedValues);
+        assertThat(actualValuesOne, equalTo(expectedValues));
+
+        Collections.sort(actualValuesTwo);
+        Collections.sort(allExpectedValues);
+        assertThat(actualValuesTwo, equalTo(allExpectedValues));
+
+        assertThat(actualValuesThree, equalTo(singleFinalExpectedValues));
+    }
+
     @Test
     public void shouldThrowExceptionOverlappingPattern() {
         final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the 
test is for completeness
-        builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+        builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(AutoOffsetReset.earliest()));
 
         try {
-            builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(Topology.AutoOffsetReset.LATEST));
+            builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(AutoOffsetReset.latest()));
             builder.build();
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
@@ -270,9 +376,9 @@ public class FineGrainedAutoResetIntegrationTest {
     public void shouldThrowExceptionOverlappingTopic() {
         final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the 
test is for completeness
-        builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+        builder.stream(Pattern.compile("topic-[A-D]_1"), 
Consumed.with(AutoOffsetReset.earliest()));
         try {
-            builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), 
Consumed.with(Topology.AutoOffsetReset.LATEST));
+            builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), 
Consumed.with(AutoOffsetReset.latest()));
             builder.build();
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
@@ -300,15 +406,17 @@ public class FineGrainedAutoResetIntegrationTest {
 
         exceptionStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
 
-        final KafkaStreams streams = new KafkaStreams(builder.build(), 
localConfig);
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
localConfig)) {
+            final TestingUncaughtExceptionHandler uncaughtExceptionHandler = 
new TestingUncaughtExceptionHandler();
+            streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 
-        final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new 
TestingUncaughtExceptionHandler();
+            streams.start();
 
-        streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
-        streams.start();
-        TestUtils.waitForCondition(() -> 
uncaughtExceptionHandler.correctExceptionThrown,
-                "The expected NoOffsetForPartitionException was never thrown");
-        streams.close();
+            waitForCondition(
+                () -> uncaughtExceptionHandler.correctExceptionThrown,
+                "The expected NoOffsetForPartitionException was never thrown"
+            );
+        }
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7badb3016f8..d6016c327fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -288,7 +288,7 @@ public class StreamsBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic) {
-        return table(topic, new ConsumedInternal<>());
+        return table(topic, Consumed.with(null, null));
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 320e0babf77..6e1ace7efa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ConnectedStoreProvider;
@@ -81,6 +82,19 @@ public class Topology {
         EARLIEST, LATEST
     }
 
+    @Deprecated
+    private static AutoOffsetResetInternal convertOldToNew(final 
Topology.AutoOffsetReset resetPolicy) {
+        if (resetPolicy == null) {
+            return null;
+        }
+
+        return new AutoOffsetResetInternal(
+            resetPolicy == 
org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
+                ? org.apache.kafka.streams.AutoOffsetReset.earliest()
+                : org.apache.kafka.streams.AutoOffsetReset.latest()
+        );
+    }
+
     /**
      * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
      * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
@@ -139,7 +153,7 @@ public class Topology {
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, 
topics);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, null, null, topics);
         return this;
     }
 
@@ -156,8 +170,7 @@ public class Topology {
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
                                            final String... topics) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, null, null, 
null, topics);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
         return this;
     }
 
@@ -181,7 +194,7 @@ public class Topology {
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, 
topicPattern);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, null, null, topicPattern);
         return this;
     }
 
@@ -203,8 +216,7 @@ public class Topology {
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
                                            final Pattern topicPattern) {
-        // TODO: mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, null, null, 
null, topicPattern);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
         return this;
     }
 
@@ -273,7 +285,7 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topics);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, null, null, topics);
         return this;
     }
 
@@ -293,8 +305,7 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final String... topics) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topics);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, 
topics);
         return this;
     }
 
@@ -321,7 +332,7 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topicPattern);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, null, null, topicPattern);
         return this;
     }
 
@@ -341,8 +352,7 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final Pattern topicPattern) {
-        // TODO
-        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topicPattern);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, 
topicPattern);
         return this;
     }
 
@@ -421,7 +431,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topics);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, keyDeserializer, valueDeserializer, topics);
         return this;
     }
 
@@ -448,8 +458,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topics);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, 
valueDeserializer, topics);
         return this;
     }
 
@@ -479,7 +488,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
null, keyDeserializer, valueDeserializer, topicPattern);
         return this;
     }
 
@@ -506,8 +515,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, 
valueDeserializer, topicPattern);
         return this;
     }
 
@@ -537,7 +545,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, keyDeserializer, valueDeserializer, topics);
         return this;
     }
 
@@ -564,8 +572,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, 
keyDeserializer, valueDeserializer, topics);
         return this;
     }
 
@@ -598,7 +605,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, 
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
         return this;
     }
 
@@ -628,8 +635,7 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        // TODO mjsax
-        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, 
keyDeserializer, valueDeserializer, topicPattern);
         return this;
     }
 
@@ -987,7 +993,14 @@ public class Topology {
                                                                   final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
         storeBuilder.withLoggingDisabled();
 
-        internalTopologyBuilder.addSource(AutoOffsetReset.EARLIEST, 
sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic);
+        internalTopologyBuilder.addSource(
+            new 
AutoOffsetResetInternal(org.apache.kafka.streams.AutoOffsetReset.earliest()),
+            sourceName,
+            timestampExtractor,
+            keyDeserializer,
+            valueDeserializer,
+            topic
+        );
         internalTopologyBuilder.addProcessor(processorName, 
stateUpdateSupplier, sourceName);
         internalTopologyBuilder.addStateStore(storeBuilder, processorName);
         
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
index 51054ee2cae..dc38dec304c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
@@ -20,7 +20,6 @@ import 
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.Strat
 import org.apache.kafka.streams.AutoOffsetReset;
 
 import java.time.Duration;
-import java.util.Optional;
 
 public class AutoOffsetResetInternal extends AutoOffsetReset {
 
@@ -31,7 +30,9 @@ public class AutoOffsetResetInternal extends AutoOffsetReset {
     public StrategyType offsetResetStrategy() {
         return offsetResetStrategy;
     }
-    public Optional<Duration> duration() {
-        return duration;
+
+    @SuppressWarnings("all")
+    public Duration duration() {
+        return duration.get();
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 046ff336fcc..ade104d6118 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -76,10 +76,6 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
         this.processorName = processorName;
     }
 
-    /**
-     * Create an instance of {@link Consumed} from an existing instance.
-     * @param consumed  the instance of {@link Consumed} to copy
-     */
     protected Consumed(final Consumed<K, V> consumed) {
         this(
             consumed.keySerde,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index 3f5f63c7b77..9ad1721dea8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -29,17 +28,6 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
         super(consumed);
     }
 
-    public ConsumedInternal(final Serde<K> keySerde,
-                            final Serde<V> valueSerde,
-                            final TimestampExtractor timestampExtractor,
-                            final AutoOffsetResetInternal offsetReset) {
-        this(Consumed.with(keySerde, valueSerde, timestampExtractor, 
offsetReset));
-    }
-
-    public ConsumedInternal() {
-        this(Consumed.with(null, null));
-    }
-
     public Serde<K> keySerde() {
         return keySerde;
     }
@@ -64,13 +52,7 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
         return resetPolicy == null ? null : new 
AutoOffsetResetInternal(resetPolicy);
     }
 
-    @SuppressWarnings("deprecation")
-    // TODO mjsax remove
-    public Topology.AutoOffsetReset legacyOffsetResetPolicy() {
-        return legacyResetPolicy;
-    }
-
     public String name() {
         return processorName;
     }
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index b13477c546e..d343bdb1bb5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -73,16 +73,14 @@ public class StreamSourceNode<K, V> extends 
SourceGraphNode<K, V> {
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
 
         if (topicPattern().isPresent()) {
-            // TODO mjsax
-            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
                                       consumedInternal().valueDeserializer(),
                                       topicPattern().get());
         } else {
-            // TODO mjsax
-            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 81b44703a2d..5e776a5c733 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -96,8 +96,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
                 false
             );
         } else {
-            // TODO mjsax
-            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                       sourceName,
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 23bdcfde6f0..67dfa061729 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -22,10 +22,10 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -148,14 +149,22 @@ public class InternalTopologyBuilder {
     // all global topics
     private final Set<String> globalTopics = new HashSet<>();
 
+    private final Set<String> noneResetTopics = new HashSet<>();
+
     private final Set<String> earliestResetTopics = new HashSet<>();
 
     private final Set<String> latestResetTopics = new HashSet<>();
 
+    private final Map<String, Duration> durationResetTopics = new HashMap<>();
+
+    private final Set<Pattern> noneResetPatterns = new HashSet<>();
+
     private final Set<Pattern> earliestResetPatterns = new HashSet<>();
 
     private final Set<Pattern> latestResetPatterns = new HashSet<>();
 
+    private final Map<Pattern, Duration> durationResetPatterns = new 
HashMap<>();
+
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
 
     // Used to capture subscribed topics via Patterns discovered during the 
partition assignment process.
@@ -333,7 +342,7 @@ public class InternalTopologyBuilder {
 
         @Override
         Source describe() {
-            return new Source(name, topics.size() == 0 ? null : new 
HashSet<>(topics), pattern);
+            return new Source(name, topics.isEmpty() ? null : new 
HashSet<>(topics), pattern);
         }
     }
 
@@ -438,8 +447,7 @@ public class InternalTopologyBuilder {
         return this;
     }
 
-    @SuppressWarnings("deprecation")
-    public final void addSource(final Topology.AutoOffsetReset offsetReset,
+    public final void addSource(final AutoOffsetResetInternal offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer<?> keyDeserializer,
@@ -456,7 +464,7 @@ public class InternalTopologyBuilder {
         for (final String topic : topics) {
             Objects.requireNonNull(topic, "topic names cannot be null");
             validateTopicNotAlreadyRegistered(topic);
-            maybeAddToResetList(earliestResetTopics, latestResetTopics, 
offsetReset, topic);
+            maybeAddToResetList(noneResetTopics, earliestResetTopics, 
latestResetTopics, durationResetTopics, offsetReset, topic);
             rawSourceTopicNames.add(topic);
         }
 
@@ -466,8 +474,7 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    @SuppressWarnings("deprecation")
-    public final void addSource(final Topology.AutoOffsetReset offsetReset,
+    public final void addSource(final AutoOffsetResetInternal offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer<?> keyDeserializer,
@@ -486,7 +493,7 @@ public class InternalTopologyBuilder {
             }
         }
 
-        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, 
offsetReset, topicPattern);
+        maybeAddToResetList(noneResetPatterns, earliestResetPatterns, 
latestResetPatterns, durationResetPatterns, offsetReset, topicPattern);
 
         nodeFactories.put(name, new SourceNodeFactory<>(name, null, 
topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourcePatterns.put(name, topicPattern);
@@ -914,22 +921,28 @@ public class InternalTopologyBuilder {
                 Collections.unmodifiableSet(sourcePatterns)
             );
         }
-
     }
 
-    @SuppressWarnings("deprecation")
-    private <T> void maybeAddToResetList(final Collection<T> earliestResets,
+    private <T> void maybeAddToResetList(final Collection<T> noneResets,
+                                         final Collection<T> earliestResets,
                                          final Collection<T> latestResets,
-                                         final Topology.AutoOffsetReset 
offsetReset,
+                                         final Map<T, Duration> durationReset,
+                                         final AutoOffsetResetInternal 
offsetReset,
                                          final T item) {
         if (offsetReset != null) {
-            switch (offsetReset) {
+            switch (offsetReset.offsetResetStrategy()) {
+                case NONE:
+                    noneResets.add(item);
+                    break;
                 case EARLIEST:
                     earliestResets.add(item);
                     break;
                 case LATEST:
                     latestResets.add(item);
                     break;
+                case BY_DURATION:
+                    durationReset.put(item, offsetReset.duration());
+                    break;
                 default:
                     throw new TopologyException(String.format("Unrecognized 
reset format %s", offsetReset));
             }
@@ -1347,19 +1360,30 @@ public class InternalTopologyBuilder {
     }
 
     public boolean hasOffsetResetOverrides() {
-        return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
-            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+        return noneResetTopics.size() + noneResetPatterns.size()
+            + earliestResetTopics.size() + earliestResetPatterns.size()
+            + latestResetTopics.size() + latestResetPatterns.size()
+            + durationResetTopics.size() + durationResetPatterns.size() > 0;
     }
 
     public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
-        if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+        final Optional<Duration> resetDuration;
+
+        if (maybeDecorateInternalSourceTopics(noneResetTopics).contains(topic) 
||
+            noneResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+            return AutoOffsetResetStrategy.NONE;
+        } else if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
             earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
             return AutoOffsetResetStrategy.EARLIEST;
         } else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
             latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
             return AutoOffsetResetStrategy.LATEST;
+        } else if 
(maybeDecorateInternalSourceTopics(durationResetTopics.keySet()).contains(topic))
 {
+            return AutoOffsetResetStrategy.fromString("by_duration:" + 
durationResetTopics.get(topic).toString());
+        } else if ((resetDuration = findDuration(topic)).isPresent()) {
+            return AutoOffsetResetStrategy.fromString("by_duration:" + 
resetDuration.get());
         } else if (containsTopic(topic)) {
-            return AutoOffsetResetStrategy.NONE;
+            return null;
         } else {
             throw new IllegalStateException(String.format(
                 "Unable to lookup offset reset strategy for the following 
topic as it does not exist in the topology%s: %s",
@@ -1369,6 +1393,19 @@ public class InternalTopologyBuilder {
         }
     }
 
+    private Optional<Duration> findDuration(final String topic) {
+        final List<Duration> resetDuration = 
durationResetPatterns.entrySet().stream()
+            .filter(e -> e.getKey().matcher(topic).matches())
+            .map(Map.Entry::getValue)
+            .collect(Collectors.toList());
+
+        if (resetDuration.size() > 1) {
+            throw new IllegalStateException("Found more than one reset 
duration for topic: " + topic);
+        }
+
+        return resetDuration.isEmpty() ? Optional.empty() : 
resetDuration.stream().findAny();
+    }
+
     /**
      * @return  map from state store name to full names (including application 
id/topology name prefix)
      *          of all source topics whose processors are connected to it
@@ -1450,9 +1487,10 @@ public class InternalTopologyBuilder {
                                             + "applicationId hasn't been set. 
Call "
                                             + "setApplicationId first");
         }
-        final String prefix = topologyConfigs == null ?
-                                applicationId :
-                                
ProcessorContextUtils.topicNamePrefix(topologyConfigs.applicationConfigs.originals(),
 applicationId);
+
+        final String prefix = topologyConfigs == null
+            ? applicationId
+            : 
ProcessorContextUtils.topicNamePrefix(topologyConfigs.applicationConfigs.originals(),
 applicationId);
 
         if (hasNamedTopology()) {
             return prefix + "-" + topologyName + "-" + topic;
@@ -1461,7 +1499,6 @@ public class InternalTopologyBuilder {
         }
     }
 
-
     void initializeSubscription() {
         if (usesPatternSubscription()) {
             log.debug("Found pattern subscribed source topics, initializing 
consumer's subscription pattern.");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ceb2c403c3e..ab4ed4656b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -23,6 +23,8 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -1291,29 +1293,77 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         final Set<String> loggedTopics = new HashSet<>();
         final Set<TopicPartition> seekToBeginning = new HashSet<>();
         final Set<TopicPartition> seekToEnd = new HashSet<>();
+        final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
         final Set<TopicPartition> notReset = new HashSet<>();
 
         for (final TopicPartition partition : partitions) {
-            final AutoOffsetResetStrategy offsetResetStrategy = 
topologyMetadata.offsetResetStrategy(partition.topic());
+            final Optional<AutoOffsetResetStrategy> offsetResetStrategy = 
topologyMetadata.offsetResetStrategy(partition.topic());
 
             // This may be null if the task we are currently processing was 
apart of a named topology that was just removed.
             // TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata 
view of named topologies in sync until final thread has acked
             if (offsetResetStrategy != null) {
-                if (offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST) {
-                    addToResetList(partition, seekToBeginning, "Setting topic 
'{}' to consume from {} offset", "earliest", loggedTopics);
-                } else if (offsetResetStrategy == 
AutoOffsetResetStrategy.LATEST) {
-                    addToResetList(partition, seekToEnd, "Setting topic '{}' 
to consume from {} offset", "latest", loggedTopics);
-                } else if (offsetResetStrategy == 
AutoOffsetResetStrategy.NONE) {
-                    final AutoOffsetResetStrategy autoOffsetResetStrategy = 
AutoOffsetResetStrategy.fromString(originalReset);
-                    if (AutoOffsetResetStrategy.EARLIEST == 
autoOffsetResetStrategy) {
-                        addToResetList(partition, seekToBeginning, "No custom 
setting defined for topic '{}' using original config '{}' for offset reset", 
"earliest", loggedTopics);
-                    } else if (AutoOffsetResetStrategy.LATEST == 
autoOffsetResetStrategy) {
-                        addToResetList(partition, seekToEnd, "No custom 
setting defined for topic '{}' using original config '{}' for offset reset", 
"latest", loggedTopics);
-                    } else {
+                if (offsetResetStrategy.isPresent()) {
+                    final AutoOffsetResetStrategy resetPolicy = 
offsetResetStrategy.get();
+
+                    if (resetPolicy == AutoOffsetResetStrategy.NONE) {
                         notReset.add(partition);
+                    } else if (resetPolicy == 
AutoOffsetResetStrategy.EARLIEST) {
+                        addToResetList(
+                            partition,
+                            seekToBeginning,
+                            "Setting topic '{}' to consume from earliest 
offset",
+                            loggedTopics
+                        );
+                    } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
+                        addToResetList(
+                            partition,
+                            seekToEnd,
+                            "Setting topic '{}' to consume from latest offset",
+                            loggedTopics
+                        );
+                    } else if (resetPolicy.type() == 
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+                        addToResetList(
+                            partition,
+                            seekByDuration,
+                            resetPolicy.duration().get(),
+                            "Setting topic '{}' to consume from 
by_duration:{}",
+                            resetPolicy.duration().get().toString(),
+                            loggedTopics
+                        );
+                    } else {
+                        throw new IllegalStateException("Unknown reset policy 
" + resetPolicy);
                     }
                 } else {
-                    throw new IllegalStateException("Unable to locate topic " 
+ partition.topic() + " in the topology");
+                    final AutoOffsetResetStrategy resetPolicy = 
AutoOffsetResetStrategy.fromString(originalReset);
+
+                    if (resetPolicy == AutoOffsetResetStrategy.NONE) {
+                        notReset.add(partition);
+                    } else if (resetPolicy == 
AutoOffsetResetStrategy.EARLIEST) {
+                        addToResetList(
+                            partition,
+                            seekToBeginning,
+                            "No custom setting defined for topic '{}' using 
original config 'earliest' for offset reset",
+                            loggedTopics
+                        );
+                    } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
+                        addToResetList(
+                            partition,
+                            seekToEnd,
+                            "No custom setting defined for topic '{}' using 
original config 'latest' for offset reset",
+                            loggedTopics
+                        );
+                    } else if (resetPolicy.type() == 
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+                        addToResetList(
+                            partition,
+                            seekByDuration,
+                            resetPolicy.duration().get(),
+                            "No custom setting defined for topic '{}' using 
original config 'by_duration:{}' for offset reset",
+                            resetPolicy.duration().get().toString(),
+                            loggedTopics
+                        );
+                    } else {
+                        throw new IllegalStateException("Unknown reset policy 
" + resetPolicy);
+                    }
                 }
             }
         }
@@ -1326,6 +1376,50 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             if (!seekToEnd.isEmpty()) {
                 mainConsumer.seekToEnd(seekToEnd);
             }
+
+            if (!seekByDuration.isEmpty()) {
+                final long nowMs = time.milliseconds();
+                final Map<TopicPartition, Long> seekToTimestamps = 
seekByDuration.entrySet().stream()
+                    .map(e -> {
+                        long seekMs = nowMs - e.getValue().toMillis();
+                        if (seekMs < 0L) {
+                            log.debug("Cannot reset offset to negative 
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs, 
e.getKey());
+                            seekMs = 0L;
+                        }
+                        return Map.entry(e.getKey(), seekMs);
+                    })
+                    .collect(HashMap::new, (m, e) -> m.put(e.getKey(), 
e.getValue()), Map::putAll);
+
+                try {
+                    for (final Map.Entry<TopicPartition, OffsetAndTimestamp> 
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) 
{
+                        final TopicPartition partition = 
partitionAndOffset.getKey();
+                        final OffsetAndTimestamp seekOffset = 
partitionAndOffset.getValue();
+                        if (seekOffset != null) {
+                            mainConsumer.seek(partition, new 
OffsetAndMetadata(seekOffset.offset()));
+                        } else {
+                            log.debug(
+                                "Cannot reset offset to non-existing timestamp 
{} (larger than timestamp of last record)" +
+                                    " for partition {}. Seeking to end 
instead.",
+                                seekToTimestamps.get(partition),
+                                partition
+                            );
+                            
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+                        }
+                    }
+                } catch (final TimeoutException timeoutException) {
+                    for (final TopicPartition partition : 
seekByDuration.keySet()) {
+                        final Task task = taskManager.getActiveTask(partition);
+                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                        stateUpdater.add(task);
+                    }
+                    log.debug(
+                        String.format(
+                            "Could not reset offset for %s due to the 
following exception; will retry.",
+                            seekByDuration.keySet()),
+                        timeoutException
+                    );
+                }
+            }
         } else {
             final String notResetString =
                 notReset.stream()
@@ -1349,14 +1443,34 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
     }
 
-    private void addToResetList(final TopicPartition partition, final 
Set<TopicPartition> partitions, final String logMessage, final String 
resetPolicy, final Set<String> loggedTopics) {
+    private void addToResetList(
+        final TopicPartition partition,
+        final Set<TopicPartition> partitions,
+        final String resetPolicy,
+        final Set<String> loggedTopics
+    ) {
         final String topic = partition.topic();
         if (loggedTopics.add(topic)) {
-            log.info(logMessage, topic, resetPolicy);
+            log.info("Setting topic '{}' to consume from {} offset", topic, 
resetPolicy);
         }
         partitions.add(partition);
     }
 
+    private void addToResetList(
+        final TopicPartition partition,
+        final Map<TopicPartition, Duration> durationForPartitions,
+        final Duration durationTime,
+        final String logMessage,
+        final String durationString,
+        final Set<String> loggedTopics
+    ) {
+        final String topic = partition.topic();
+        if (loggedTopics.add(topic)) {
+            log.info(logMessage, topic, durationString);
+        }
+        durationForPartitions.put(partition, durationTime);
+    }
+
     // This method is added for usage in tests where mocking the underlying 
native call is not possible.
     public boolean isThreadAlive() {
         return isAlive();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f361d3dc320..6b90e354c27 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1886,7 +1886,7 @@ public class TaskManager {
         }
     }
 
-    private Task getActiveTask(final TopicPartition partition) {
+    Task getActiveTask(final TopicPartition partition) {
         final Task activeTask = tasks.activeTasksForInputPartition(partition);
 
         if (activeTask == null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 88e4ede59ad..fe04f2c4613 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -427,10 +427,10 @@ public class TopologyMetadata {
         return hasNamedTopologies() || 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
     }
 
-    public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
+    public Optional<AutoOffsetResetStrategy> offsetResetStrategy(final String 
topic) {
         for (final InternalTopologyBuilder builder : builders.values()) {
             if (builder.containsTopic(topic)) {
-                return builder.offsetResetStrategy(topic);
+                return Optional.ofNullable(builder.offsetResetStrategy(topic));
             }
         }
         log.warn("Unable to look up offset reset strategy for topic {} " +
@@ -439,6 +439,9 @@ public class TopologyMetadata {
                 "persist or appear frequently.",
             topic, namedTopologiesView()
         );
+        // returning `null` for an Optional return type triggers spotbugs
+        // we added an exception for NP_OPTIONAL_RETURN_NULL for this method
+        // when we remove NamedTopologies, we can remove this exception
         return null;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java 
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
index 2dad17cd81f..e5679e99d8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -21,31 +21,37 @@ import 
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.NoSuchElementException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class AutoOffsetResetTest {
 
     @Test
-    void latestShouldReturnAnEmptyDuration() {
-        final AutoOffsetResetInternal latest = new 
AutoOffsetResetInternal(AutoOffsetReset.latest());
-        assertTrue(latest.duration().isEmpty(), "Latest should have an empty 
duration.");
+    void shouldThrowExceptionOnDurationForNoneReset() {
+        final AutoOffsetResetInternal none = new 
AutoOffsetResetInternal(AutoOffsetReset.none());
+        assertThrows(NoSuchElementException.class, none::duration, "None 
should not have a duration.");
     }
 
     @Test
-    void earliestShouldReturnAnEmptyDuration() {
+    void shouldThrowExceptionOnDurationForEarliestReset() {
         final AutoOffsetResetInternal earliest = new 
AutoOffsetResetInternal(AutoOffsetReset.earliest());
-        assertTrue(earliest.duration().isEmpty(), "Earliest should have an 
empty duration.");
+        assertThrows(NoSuchElementException.class, earliest::duration, 
"Earliest should not have a duration.");
+    }
+
+    @Test
+    void shouldThrowExceptionOnDurationForLastetReset() {
+        final AutoOffsetResetInternal latest = new 
AutoOffsetResetInternal(AutoOffsetReset.latest());
+        assertThrows(NoSuchElementException.class, latest::duration, "Latest 
should not have a duration.");
     }
 
     @Test
     void customDurationShouldMatchExpectedValue() {
         final Duration duration = Duration.ofSeconds(10L);
         final AutoOffsetResetInternal custom = new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration));
-        assertEquals(10L, custom.duration().get().toSeconds(), "Duration 
should match the specified value in milliseconds.");
+        assertEquals(10L, custom.duration().toSeconds(), "Duration should 
match the specified value in milliseconds.");
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 3df6840ff71..c761d1eda7c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.AutoOffsetReset;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -64,7 +66,6 @@ import java.util.regex.Pattern;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
-import static org.apache.kafka.streams.Topology.AutoOffsetReset;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -81,7 +82,7 @@ public class InternalStreamsBuilderTest {
     private static final String APP_ID = "app-id";
 
     private final InternalStreamsBuilder builder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
-    private final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>();
+    private final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(null, null));
     private final String storePrefix = "prefix-";
     private final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new 
MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
     private final Properties props = StreamsTestUtils.getStreamsConfig();
@@ -287,10 +288,20 @@ public class InternalStreamsBuilderTest {
         assertEquals(Collections.singletonList(APP_ID + 
"-KSTREAM-MAP-0000000003-repartition"), 
builder.internalTopologyBuilder.sourceTopicsForStore("count"));
     }
 
+    @Test
+    public void shouldAddTopicToNoneAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.none()));
+        builder.stream(Collections.singleton(topicName), consumed);
+        builder.buildAndOptimizeTopology();
+
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.NONE));
+    }
+
     @Test
     public void shouldAddTopicToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST));
+        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest()));
         builder.stream(Collections.singleton(topicName), consumed);
         builder.buildAndOptimizeTopology();
 
@@ -301,16 +312,35 @@ public class InternalStreamsBuilderTest {
     public void shouldAddTopicToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
 
-        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST));
+        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest()));
         builder.stream(Collections.singleton(topicName), consumed);
         builder.buildAndOptimizeTopology();
         
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.LATEST));
     }
 
+    @Test
+    public void shouldAddTopicToDurationAutoOffsetResetList() {
+        final String topicName = "topic-1";
+
+        final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42L)))));
+        builder.stream(Collections.singleton(topicName), consumed);
+        builder.buildAndOptimizeTopology();
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).type(),
 equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).duration().get().toSeconds(),
 equalTo(42L));
+    }
+
+    @Test
+    public void shouldAddTableToNoneAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.none())), materialized);
+        builder.buildAndOptimizeTopology();
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.NONE));
+    }
+
     @Test
     public void shouldAddTableToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), materialized);
+        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest())), materialized);
         builder.buildAndOptimizeTopology();
         
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
     }
@@ -318,11 +348,20 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTableToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), materialized);
+        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest())), materialized);
         builder.buildAndOptimizeTopology();
         
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.LATEST));
     }
 
+    @Test
+    public void shouldAddTableToDurationAutoOffsetResetList() {
+        final String topicName = "topic-1";
+        builder.table(topicName, new 
ConsumedInternal<>(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))),
 materialized);
+        builder.buildAndOptimizeTopology();
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).type(),
 equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).duration().get().toSeconds(),
 equalTo(42L));
+    }
+
     @Test
     public void shouldNotAddTableToOffsetResetLists() {
         final String topicName = "topic-1";
@@ -330,7 +369,7 @@ public class InternalStreamsBuilderTest {
         builder.table(topicName, consumed, materialized);
         builder.buildAndOptimizeTopology();
 
-        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(AutoOffsetResetStrategy.NONE));
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), 
equalTo(null));
     }
 
     @Test
@@ -341,7 +380,7 @@ public class InternalStreamsBuilderTest {
         builder.stream(topicPattern, consumed);
         builder.buildAndOptimizeTopology();
 
-        assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic), 
equalTo(AutoOffsetResetStrategy.NONE));
+        assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic), 
equalTo(null));
     }
 
     @Test
@@ -349,7 +388,7 @@ public class InternalStreamsBuilderTest {
         final Pattern topicPattern = Pattern.compile("topic-\\d+");
         final String topicTwo = "topic-500000";
 
-        builder.stream(topicPattern, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)));
+        builder.stream(topicPattern, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest())));
         builder.buildAndOptimizeTopology();
 
         
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
@@ -360,12 +399,24 @@ public class InternalStreamsBuilderTest {
         final Pattern topicPattern = Pattern.compile("topic-\\d+");
         final String topicTwo = "topic-1000000";
 
-        builder.stream(topicPattern, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)));
+        builder.stream(topicPattern, new 
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest())));
         builder.buildAndOptimizeTopology();
 
         
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), 
equalTo(AutoOffsetResetStrategy.LATEST));
     }
 
+    @Test
+    public void shouldAddRegexTopicToDurationAutoOffsetResetList() {
+        final Pattern topicPattern = Pattern.compile("topic-\\d+");
+        final String topicTwo = "topic-1000000";
+
+        builder.stream(topicPattern, new 
ConsumedInternal<>(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))));
+        builder.buildAndOptimizeTopology();
+
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo).type(),
 equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+        
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo).duration().get().toSeconds(),
 equalTo(42L));
+    }
+
     @Test
     public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.stream(Collections.singleton("topic"), consumed);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b0afe364985..5802518cd26 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -21,13 +21,14 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.AutoOffsetReset;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
@@ -100,28 +101,43 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
+        final String noneTopic = "noneTopic";
         final String earliestTopic = "earliestTopic";
         final String latestTopic = "latestTopic";
+        final String durationTopic = "durationTopic";
 
-        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, 
null, null, earliestTopic);
-        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, 
null, null, latestTopic);
+        builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), 
"source0", null, null, null, noneTopic);
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source1", null, null, 
null, earliestTopic);
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, 
latestTopic);
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), 
"source3", null, null, null, durationTopic);
         builder.initializeSubscription();
 
+        assertThat(builder.offsetResetStrategy(noneTopic), 
equalTo(AutoOffsetResetStrategy.NONE));
         assertThat(builder.offsetResetStrategy(earliestTopic), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
         assertThat(builder.offsetResetStrategy(latestTopic), 
equalTo(AutoOffsetResetStrategy.LATEST));
+        assertThat(builder.offsetResetStrategy(durationTopic).type(), 
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+        
assertThat(builder.offsetResetStrategy(durationTopic).duration().get().toSeconds(),
 equalTo(42L));
     }
 
     @Test
     public void shouldAddSourcePatternWithOffsetReset() {
+        final String noneTopicPattern = "none.*Topic";
         final String earliestTopicPattern = "earliest.*Topic";
         final String latestTopicPattern = "latest.*Topic";
+        final String durationTopicPattern = "duration.*Topic";
+
+        builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), 
"source0", null, null, null, Pattern.compile(noneTopicPattern));
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "sourc1", null, null, 
null, Pattern.compile(earliestTopicPattern));
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, 
 Pattern.compile(latestTopicPattern));
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), 
"source3", null, null, null, Pattern.compile(durationTopicPattern));
 
-        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, 
null, null, Pattern.compile(earliestTopicPattern));
-        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, 
null, null,  Pattern.compile(latestTopicPattern));
         builder.initializeSubscription();
 
+        assertThat(builder.offsetResetStrategy("noneTestTopic"), 
equalTo(AutoOffsetResetStrategy.NONE));
         assertThat(builder.offsetResetStrategy("earliestTestTopic"), 
equalTo(AutoOffsetResetStrategy.EARLIEST));
         assertThat(builder.offsetResetStrategy("latestTestTopic"), 
equalTo(AutoOffsetResetStrategy.LATEST));
+        assertThat(builder.offsetResetStrategy("durationTestTopic").type(), 
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+        
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().get().toSeconds(),
 equalTo(42L));
     }
 
     @Test
@@ -131,7 +147,7 @@ public class InternalTopologyBuilderTest {
 
         assertEquals(Collections.singletonList("test-topic"), 
builder.fullSourceTopicNames());
 
-        assertThat(builder.offsetResetStrategy("test-topic"), 
equalTo(AutoOffsetResetStrategy.NONE));
+        assertThat(builder.offsetResetStrategy("test-topic"), equalTo(null));
     }
 
     @Test
@@ -143,20 +159,20 @@ public class InternalTopologyBuilderTest {
 
         assertThat(expectedPattern.pattern(), 
builder.sourceTopicPatternString(), equalTo("test-.*"));
 
-        assertThat(builder.offsetResetStrategy("test-topic"), 
equalTo(AutoOffsetResetStrategy.NONE));
+        assertThat(builder.offsetResetStrategy("test-topic"), equalTo(null));
     }
 
     @Test
     public void shouldNotAllowOffsetResetSourceWithoutTopics() {
-        assertThrows(TopologyException.class, () -> 
builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source",
+        assertThrows(TopologyException.class, () -> builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source",
             null, stringSerde.deserializer(), stringSerde.deserializer()));
     }
 
     @Test
     public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
-        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, 
stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+        builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source", null, 
stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
         try {
-            builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null, 
stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+            builder.addSource(new 
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source", null, 
stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
             fail("Should throw TopologyException for duplicate source name");
         } catch (final TopologyException expected) { /* ok */ }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8a3e6c17d8a..e48d9275b3a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -62,6 +62,7 @@ import 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
@@ -187,7 +188,7 @@ public class StreamThreadTest {
     private final MockTime mockTime = new MockTime();
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
-    private final ConsumedInternal<Object, Object> consumed = new 
ConsumedInternal<>();
+    private final ConsumedInternal<Object, Object> consumed = new 
ConsumedInternal<>(Consumed.with(null, null));
     private final ChangelogReader changelogReader = new MockChangelogReader();
     private StateDirectory stateDirectory = null;
     private final InternalTopologyBuilder internalTopologyBuilder = new 
InternalTopologyBuilder();
@@ -2087,7 +2088,7 @@ public class StreamThreadTest {
             .count(Materialized.as(storeName1));
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized
             = new MaterializedInternal<>(Materialized.as(storeName2), 
internalStreamsBuilder, "");
-        internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), 
materialized);
+        internalStreamsBuilder.table(topic2, new 
ConsumedInternal<>(Consumed.with(null, null)), materialized);
 
         internalStreamsBuilder.buildAndOptimizeTopology();
         restoreConsumer.updatePartitions(changelogName1,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 6cf72e91db3..592326fae87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -2702,8 +2703,8 @@ public class StreamsPartitionAssignorTest {
 
         final InternalStreamsBuilder streamsBuilder = new 
InternalStreamsBuilder(builder);
 
-        final KStream<String, String> inputTopic = 
streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
-        final KTable<String, String> inputTable = 
streamsBuilder.table("topic2", new ConsumedInternal<>(), new 
MaterializedInternal<>(Materialized.as("store")));
+        final KStream<String, String> inputTopic = 
streamsBuilder.stream(singleton("topic1"), new 
ConsumedInternal<>(Consumed.with(null, null)));
+        final KTable<String, String> inputTable = 
streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, 
null)), new MaterializedInternal<>(Materialized.as("store")));
         inputTopic
             .groupBy(
                 (k, v) -> k,

Reply via email to