This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d83f09d014f KAFKA-18015: Add support for duration based offset reset 
strategy to Kafka Streams (#17973)
d83f09d014f is described below

commit d83f09d014ff574784212cfdc317b5004c90c687
Author: KApolinario1120 <[email protected]>
AuthorDate: Wed Dec 11 12:47:25 2024 -0600

    KAFKA-18015: Add support for duration based offset reset strategy to Kafka 
Streams (#17973)
    
    Part of KIP-1106.
    
    Adds the public APIs to Kafka Streams, to support the the newly added 
"by_duration" reset policy,
    plus adds the missing "none" reset policy. Deprecates the enum 
`Topology.AutoOffsetReset` and
    all related methods, and replaced them with new overload using the new 
`AutoOffsetReset` class.
    
    Co-authored-by: Matthias J. Sax <[email protected]>
    
    Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck 
<[email protected]>, Manikumar Reddy <[email protected]>
---
 .../org/apache/kafka/streams/AutoOffsetReset.java  | 103 ++++++++++
 .../org/apache/kafka/streams/StreamsBuilder.java   |   2 +-
 .../java/org/apache/kafka/streams/Topology.java    | 220 ++++++++++++++++++++-
 .../streams/internals/AutoOffsetResetInternal.java |  37 ++++
 .../org/apache/kafka/streams/kstream/Consumed.java | 127 ++++++++++--
 .../kstream/internals/ConsumedInternal.java        |  14 +-
 .../kstream/internals/graph/StreamSourceNode.java  |   8 +-
 .../kstream/internals/graph/TableSourceNode.java   |   3 +-
 .../internals/InternalTopologyBuilder.java         |   3 +
 .../apache/kafka/streams/AutoOffsetResetTest.java  |  83 ++++++++
 .../org/apache/kafka/streams/TopologyTest.java     |   4 +-
 .../kafka/streams/scala/kstream/Consumed.scala     |  36 +++-
 .../kafka/streams/scala/kstream/ConsumedTest.scala |  14 +-
 13 files changed, 615 insertions(+), 39 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java 
b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
new file mode 100644
index 00000000000..f3f3a941d20
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import 
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link Topology#addSource(AutoOffsetReset, String, String...) adding a 
source processor}
+ * or when creating {@link KStream} or {@link KTable} via {@link 
StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+    protected final StrategyType offsetResetStrategy;
+    protected final Optional<Duration> duration;
+
+    private AutoOffsetReset(final StrategyType offsetResetStrategy, final 
Optional<Duration> duration) {
+        this.offsetResetStrategy = offsetResetStrategy;
+        this.duration = duration;
+    }
+
+    protected AutoOffsetReset(final AutoOffsetReset autoOffsetReset) {
+        this(autoOffsetReset.offsetResetStrategy, autoOffsetReset.duration);
+    }
+
+    /**
+     * Creates an {@code AutoOffsetReset} instance representing "none".
+     *
+     * @return An {@link AutoOffsetReset} instance for no reset.
+     */
+    public static AutoOffsetReset none() {
+        return new AutoOffsetReset(StrategyType.NONE, Optional.empty());
+    }
+
+    /**
+     * Creates an {@code AutoOffsetReset} instance representing "earliest".
+     *
+     * @return An {@link AutoOffsetReset} instance for the "earliest" offset.
+     */
+    public static AutoOffsetReset earliest() {
+        return new AutoOffsetReset(StrategyType.EARLIEST, Optional.empty());
+    }
+
+    /**
+     * Creates an {@code AutoOffsetReset} instance representing "latest".
+     * 
+     * @return An {@code AutoOffsetReset} instance for the "latest" offset.
+     */
+    public static AutoOffsetReset latest() {
+        return new AutoOffsetReset(StrategyType.LATEST, Optional.empty());
+    }
+
+    /**
+     * Creates an {@code AutoOffsetReset} instance for the specified reset 
duration.
+     * 
+     * @param duration The duration to use for the offset reset; must be 
non-negative.
+     * @return An {@code AutoOffsetReset} instance with the specified duration.
+     * @throws IllegalArgumentException If the duration is negative.
+     */
+    public static AutoOffsetReset byDuration(final Duration duration) {
+        if (duration.isNegative()) {
+            throw new IllegalArgumentException("Duration cannot be negative");
+        }
+        return new AutoOffsetReset(StrategyType.BY_DURATION, 
Optional.of(duration));
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final AutoOffsetReset that = (AutoOffsetReset) o;
+        return offsetResetStrategy == that.offsetResetStrategy && 
duration.equals(that.duration);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = offsetResetStrategy.hashCode();
+        result = 31 * result + duration.hashCode();
+        return result;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index b9cc75b9fde..7badb3016f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -144,7 +144,7 @@ public class StreamsBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public synchronized <K, V> KStream<K, V> stream(final Collection<String> 
topics) {
-        return stream(topics, Consumed.with(null, null, null, null));
+        return stream(topics, Consumed.with(null, null));
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 35fe13faa38..320e0babf77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -73,7 +73,10 @@ public class Topology {
      * Sets the {@code auto.offset.reset} configuration when
      * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
      * or {@link KTable} via {@link StreamsBuilder}.
+     *
+     * @deprecated Since 4.0. Use {@link 
org.apache.kafka.streams.AutoOffsetReset} instead.
      */
+    @Deprecated
     public enum AutoOffsetReset {
         EARLIEST, LATEST
     }
@@ -130,7 +133,9 @@ public class Topology {
      * @param topics the name of one or more Kafka topics that this source is 
to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)} 
instead.
      */
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final String... topics) {
@@ -138,6 +143,24 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Adds a new source that consumes the specified topics and forwards the 
records to child processor and/or sink nodes.
+     * The source will use the specified {@link 
org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed 
offsets are found.
+     *
+     * @param offsetReset the auto offset reset policy to use for this source 
if no committed offsets are found
+     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
+     * @param topics the name of one or more Kafka topics that this source is 
to consume
+     * @return itself
+     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final String... topics) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, null, null, 
null, topics);
+        return this;
+    }
+
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forward the records to child processor and/or sink nodes.
@@ -152,7 +175,9 @@ public class Topology {
      * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead.
      */
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final Pattern topicPattern) {
@@ -160,6 +185,29 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     *
+     * @param offsetReset the auto offset reset policy value for this source 
if no committed offsets found
+     * @param name the unique name of the source used to reference this node 
when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        // TODO: mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, null, null, 
null, topicPattern);
+        return this;
+    }
+
     /**
      * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
      * The source will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
@@ -218,7 +266,9 @@ public class Topology {
      * @param topics             the name of one or more Kafka topics that 
this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, 
String, String...)} instead.
      */
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
@@ -227,6 +277,27 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Adds a new source that consumes the specified topics with a specified 
{@link TimestampExtractor}
+     * and forwards the records to child processor and/or sink nodes.
+     * The source will use the provided timestamp extractor to determine the 
timestamp of each record.
+     *
+     * @param offsetReset the auto offset reset policy to use if no committed 
offsets are found
+     * @param timestampExtractor the timestamp extractor to use for this source
+     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
+     * @param topics the name of one or more Kafka topics that this source is 
to consume
+     * @return itself
+     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final TimestampExtractor 
timestampExtractor,
+                                           final String name,
+                                           final String... topics) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topics);
+        return this;
+    }
+
     /**
      * Add a new source that consumes from topics matching the given pattern 
and forward the records to child processor
      * and/or sink nodes.
@@ -243,7 +314,9 @@ public class Topology {
      * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, 
String, Pattern)} instead.
      */
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
@@ -252,6 +325,27 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Adds a new source that consumes from topics matching the given pattern 
with a specified {@link TimestampExtractor}
+     * and forwards the records to child processor and/or sink nodes.
+     * The source will use the provided timestamp extractor to determine the 
timestamp of each record.
+     *
+     * @param offsetReset the auto offset reset policy to use if no committed 
offsets are found
+     * @param timestampExtractor the timestamp extractor to use for this source
+     * @param name the unique name of the source used to reference this node 
when {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}
+     * @param topicPattern the regular expression pattern to match Kafka 
topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if a processor is already added or if topics 
have already been registered by another source
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final TimestampExtractor 
timestampExtractor,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        // TODO
+        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topicPattern);
+        return this;
+    }
+
     /**
      * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -319,8 +413,9 @@ public class Topology {
      * @param topics             the name of one or more Kafka topics that 
this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, 
Deserializer, String...)} instead.
      */
-    @SuppressWarnings("overloads")
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final Deserializer<?> 
keyDeserializer,
@@ -330,6 +425,34 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all the specified topics, 
so care should be taken when specifying
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
+     * @param name               the unique name of the source used to 
reference this node when
+     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
+     *                           key deserializer defined in the configs will 
be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value 
deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that 
this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer<?> 
keyDeserializer,
+                                           final Deserializer<?> 
valueDeserializer,
+                                           final String... topics) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
     /**
      * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
      * and/or sink nodes.
@@ -348,7 +471,9 @@ public class Topology {
      * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, 
Deserializer, Pattern)} instead.
      */
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final Deserializer<?> 
keyDeserializer,
@@ -358,6 +483,34 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
+     * @param name               the unique name of the source used to 
reference this node when
+     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
+     *                           key deserializer defined in the configs will 
be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value 
deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer<?> 
keyDeserializer,
+                                           final Deserializer<?> 
valueDeserializer,
+                                           final Pattern topicPattern) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
     /**
      * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -375,8 +528,9 @@ public class Topology {
      * @param topics             the name of one or more Kafka topics that 
this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, 
TimestampExtractor, Deserializer, Deserializer, String...)} instead.
      */
-    @SuppressWarnings("overloads")
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor 
timestampExtractor,
@@ -387,6 +541,34 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
+     * @param name               the unique name of the source used to 
reference this node when
+     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
+     *                           if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
+     *                           key deserializer defined in the configs will 
be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value 
deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that 
this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics 
have already been registered by another source
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor 
timestampExtractor,
+                                           final Deserializer<?> 
keyDeserializer,
+                                           final Deserializer<?> 
valueDeserializer,
+                                           final String... topics) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
     /**
      * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
      * and/or sink nodes.
@@ -407,8 +589,9 @@ public class Topology {
      * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
      * @return itself
      * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     * @deprecated Since 4.0. Use {@link 
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, 
TimestampExtractor, Deserializer, Deserializer, Pattern)} instead.
      */
-    @SuppressWarnings("overloads")
+    @Deprecated
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor 
timestampExtractor,
@@ -419,6 +602,37 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Add a new source that consumes from topics matching the given pattern 
and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so 
care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this 
stream if no committed offsets found
+     * @param name               the unique name of the source used to 
reference this node when
+     *                           {@link #addProcessor(String, 
ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for 
this source,
+     *                           if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if 
not specified the default
+     *                           key deserializer defined in the configs will 
be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value 
deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics 
have already been registered by name
+     */
+    public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor 
timestampExtractor,
+                                           final Deserializer<?> 
keyDeserializer,
+                                           final Deserializer<?> 
valueDeserializer,
+                                           final Pattern topicPattern) {
+        // TODO mjsax
+        //internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
     /**
      * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
      * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
new file mode 100644
index 00000000000..51054ee2cae
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
+import org.apache.kafka.streams.AutoOffsetReset;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class AutoOffsetResetInternal extends AutoOffsetReset {
+
+    public AutoOffsetResetInternal(final AutoOffsetReset autoOffsetReset) {
+        super(autoOffsetReset);
+    }
+
+    public StrategyType offsetResetStrategy() {
+        return offsetResetStrategy;
+    }
+    public Optional<Duration> duration() {
+        return duration;
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index d1713ab20a1..046ff336fcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.AutoOffsetReset;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -55,30 +56,52 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     protected Serde<K> keySerde;
     protected Serde<V> valueSerde;
     protected TimestampExtractor timestampExtractor;
-    protected Topology.AutoOffsetReset resetPolicy;
+    @Deprecated
+    protected Topology.AutoOffsetReset legacyResetPolicy;
+    protected AutoOffsetReset resetPolicy;
     protected String processorName;
 
+    @SuppressWarnings("deprecation")
     private Consumed(final Serde<K> keySerde,
                      final Serde<V> valueSerde,
                      final TimestampExtractor timestampExtractor,
-                     final Topology.AutoOffsetReset resetPolicy,
+                     final Topology.AutoOffsetReset legacyResetPolicy,
+                     final AutoOffsetReset resetPolicy,
                      final String processorName) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.timestampExtractor = timestampExtractor;
+        this.legacyResetPolicy = legacyResetPolicy;
         this.resetPolicy = resetPolicy;
         this.processorName = processorName;
     }
 
+    /**
+     * Create an instance of {@link Consumed} from an existing instance.
+     * @param consumed  the instance of {@link Consumed} to copy
+     */
     protected Consumed(final Consumed<K, V> consumed) {
-        this(consumed.keySerde,
-             consumed.valueSerde,
-             consumed.timestampExtractor,
-             consumed.resetPolicy,
-             consumed.processorName
+        this(
+            consumed.keySerde,
+            consumed.valueSerde,
+            consumed.timestampExtractor,
+            consumed.legacyResetPolicy,
+            consumed.resetPolicy,
+            consumed.processorName
         );
     }
 
+    @Deprecated
+    private static AutoOffsetReset convertOldToNew(final 
Topology.AutoOffsetReset resetPolicy) {
+        if (resetPolicy == null) {
+            return null;
+        }
+
+        return resetPolicy == 
org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
+            ? AutoOffsetReset.earliest()
+            : AutoOffsetReset.latest();
+    }
+
     /**
      * Create an instance of {@link Consumed} with the supplied arguments. 
{@code null} values are acceptable.
      *
@@ -95,12 +118,39 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @param <V> value type
      *
      * @return a new instance of {@link Consumed}
+     *
+     * @deprecated Since 4.0. Use {@link #with(Serde, Serde, 
TimestampExtractor, AutoOffsetReset)} instead.
      */
+    @Deprecated
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
                                              final Serde<V> valueSerde,
                                              final TimestampExtractor 
timestampExtractor,
                                              final Topology.AutoOffsetReset 
resetPolicy) {
-        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, null);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, convertOldToNew(resetPolicy), null);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with the supplied arguments. 
{@code null} values are acceptable.
+     *
+     * @param keySerde
+     *        the key serde. If {@code null} the default key serde from config 
will be used
+     * @param valueSerde
+     *        the value serde. If {@code null} the default value serde from 
config will be used
+     * @param timestampExtractor
+     *        the timestamp extractor to used. If {@code null} the default 
timestamp extractor from config will be used
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
+                                             final Serde<V> valueSerde,
+                                             final TimestampExtractor 
timestampExtractor,
+                                             final AutoOffsetReset 
resetPolicy) {
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, 
resetPolicy, null);
     }
 
     /**
@@ -118,7 +168,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      */
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
                                              final Serde<V> valueSerde) {
-        return new Consumed<>(keySerde, valueSerde, null, null, null);
+        return new Consumed<>(keySerde, valueSerde, null, null, null, null);
     }
 
     /**
@@ -133,7 +183,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final TimestampExtractor 
timestampExtractor) {
-        return new Consumed<>(null, null, timestampExtractor, null, null);
+        return new Consumed<>(null, null, timestampExtractor, null, null, 
null);
     }
 
     /**
@@ -146,9 +196,27 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @param <V> value type
      *
      * @return a new instance of {@link Consumed}
+     *
+     * @deprecated Since 4.0. Use {@link #with(AutoOffsetReset)} instead.
      */
+    @Deprecated
     public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset 
resetPolicy) {
-        return new Consumed<>(null, null, null, resetPolicy, null);
+        return new Consumed<>(null, null, null, resetPolicy, 
convertOldToNew(resetPolicy), null);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with a {@link 
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
+     *
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final AutoOffsetReset 
resetPolicy) {
+        return new Consumed<>(null, null, null, null, resetPolicy, null);
     }
 
     /**
@@ -163,7 +231,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> as(final String processorName) {
-        return new Consumed<>(null, null, null, null, processorName);
+        return new Consumed<>(null, null, null, null, null, processorName);
     }
 
     /**
@@ -175,7 +243,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
-        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
legacyResetPolicy, resetPolicy, processorName);
     }
 
     /**
@@ -187,7 +255,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
-        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
legacyResetPolicy, resetPolicy, processorName);
     }
 
     /**
@@ -199,7 +267,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withTimestampExtractor(final TimestampExtractor 
timestampExtractor) {
-        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
legacyResetPolicy, resetPolicy, processorName);
     }
 
     /**
@@ -209,9 +277,31 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
      *
      * @return a new instance of {@link Consumed}
+     *
+     * @deprecated Since 4.0. Use {@link 
#withOffsetResetPolicy(AutoOffsetReset)} instead.
      */
+    @Deprecated
     public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset 
resetPolicy) {
-        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
+        return new Consumed<>(
+            keySerde,
+            valueSerde,
+            timestampExtractor,
+            resetPolicy,
+            convertOldToNew(resetPolicy),
+            processorName
+        );
+    }
+
+    /**
+     * Configure the instance of {@link Consumed} with a {@link 
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
+     *
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @return a new instance of {@link Consumed}
+     */
+    public Consumed<K, V> withOffsetResetPolicy(final AutoOffsetReset 
resetPolicy) {
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, 
resetPolicy, processorName);
     }
 
     /**
@@ -224,7 +314,7 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
      */
     @Override
     public Consumed<K, V> withName(final String processorName) {
-        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
legacyResetPolicy, resetPolicy, processorName);
     }
 
     @Override
@@ -239,11 +329,12 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
         return Objects.equals(keySerde, consumed.keySerde) &&
                Objects.equals(valueSerde, consumed.valueSerde) &&
                Objects.equals(timestampExtractor, consumed.timestampExtractor) 
&&
+               legacyResetPolicy == consumed.legacyResetPolicy &&
                resetPolicy == consumed.resetPolicy;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(keySerde, valueSerde, timestampExtractor, 
resetPolicy);
+        return Objects.hash(keySerde, valueSerde, timestampExtractor, 
legacyResetPolicy, resetPolicy);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index 40bd53a0b8c..3f5f63c7b77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
@@ -28,11 +29,10 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
         super(consumed);
     }
 
-
     public ConsumedInternal(final Serde<K> keySerde,
                             final Serde<V> valueSerde,
                             final TimestampExtractor timestampExtractor,
-                            final Topology.AutoOffsetReset offsetReset) {
+                            final AutoOffsetResetInternal offsetReset) {
         this(Consumed.with(keySerde, valueSerde, timestampExtractor, 
offsetReset));
     }
 
@@ -60,8 +60,14 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
         return timestampExtractor;
     }
 
-    public Topology.AutoOffsetReset offsetResetPolicy() {
-        return resetPolicy;
+    public AutoOffsetResetInternal offsetResetPolicy() {
+        return resetPolicy == null ? null : new 
AutoOffsetResetInternal(resetPolicy);
+    }
+
+    @SuppressWarnings("deprecation")
+    // TODO mjsax remove
+    public Topology.AutoOffsetReset legacyOffsetResetPolicy() {
+        return legacyResetPolicy;
     }
 
     public String name() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index 97b686eaff6..b13477c546e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.AutoOffsetReset;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -73,14 +73,16 @@ public class StreamSourceNode<K, V> extends 
SourceGraphNode<K, V> {
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
 
         if (topicPattern().isPresent()) {
-            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+            // TODO mjsax
+            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
                                       consumedInternal().valueDeserializer(),
                                       topicPattern().get());
         } else {
-            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+            // TODO mjsax
+            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 5e776a5c733..81b44703a2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -96,7 +96,8 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
                 false
             );
         } else {
-            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+            // TODO mjsax
+            
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
                                       sourceName,
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index eeb076fc0cf..23bdcfde6f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -438,6 +438,7 @@ public class InternalTopologyBuilder {
         return this;
     }
 
+    @SuppressWarnings("deprecation")
     public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
@@ -465,6 +466,7 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
+    @SuppressWarnings("deprecation")
     public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
@@ -915,6 +917,7 @@ public class InternalTopologyBuilder {
 
     }
 
+    @SuppressWarnings("deprecation")
     private <T> void maybeAddToResetList(final Collection<T> earliestResets,
                                          final Collection<T> latestResets,
                                          final Topology.AutoOffsetReset 
offsetReset,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java 
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
new file mode 100644
index 00000000000..2dad17cd81f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AutoOffsetResetTest {
+
+    @Test
+    void latestShouldReturnAnEmptyDuration() {
+        final AutoOffsetResetInternal latest = new 
AutoOffsetResetInternal(AutoOffsetReset.latest());
+        assertTrue(latest.duration().isEmpty(), "Latest should have an empty 
duration.");
+    }
+
+    @Test
+    void earliestShouldReturnAnEmptyDuration() {
+        final AutoOffsetResetInternal earliest = new 
AutoOffsetResetInternal(AutoOffsetReset.earliest());
+        assertTrue(earliest.duration().isEmpty(), "Earliest should have an 
empty duration.");
+    }
+
+    @Test
+    void customDurationShouldMatchExpectedValue() {
+        final Duration duration = Duration.ofSeconds(10L);
+        final AutoOffsetResetInternal custom = new 
AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration));
+        assertEquals(10L, custom.duration().get().toSeconds(), "Duration 
should match the specified value in milliseconds.");
+    }
+
+    @Test
+    void shouldThrowExceptionIfDurationIsNegative() {
+        final IllegalArgumentException exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> AutoOffsetReset.byDuration(Duration.ofSeconds(-1)),
+            "Creating an AutoOffsetReset with a negative duration should throw 
an IllegalArgumentException."
+        );
+        assertEquals("Duration cannot be negative", exception.getMessage(), 
"Exception message should indicate the duration cannot be negative.");
+    }
+
+    @Test
+    void twoInstancesCreatedAtTheSameTimeWithSameOptionsShouldBeEqual() {
+        final AutoOffsetReset latest1 = AutoOffsetReset.latest();
+        final AutoOffsetReset latest2 = AutoOffsetReset.latest();
+        final AutoOffsetReset earliest1 = AutoOffsetReset.earliest();
+        final  AutoOffsetReset earliest2 = AutoOffsetReset.earliest();
+        final AutoOffsetReset custom1 = 
AutoOffsetReset.byDuration(Duration.ofSeconds(5));
+        final AutoOffsetReset custom2 = 
AutoOffsetReset.byDuration(Duration.ofSeconds(5));
+        final AutoOffsetReset customDifferent = 
AutoOffsetReset.byDuration(Duration.ofSeconds(10));
+
+        // Equals
+        assertEquals(latest1, latest2, "Two latest instances should be 
equal.");
+        assertEquals(earliest1, earliest2, "Two earliest instances should be 
equal.");
+        assertEquals(custom1, custom2, "Two custom instances with the same 
duration should be equal.");
+        assertNotEquals(latest1, earliest1, "Latest and earliest should not be 
equal.");
+        assertNotEquals(custom1, customDifferent, "Custom instances with 
different durations should not be equal.");
+
+        // HashCode
+        assertEquals(latest1.hashCode(), latest2.hashCode(), "HashCode for 
equal instances should be the same.");
+        assertEquals(custom1.hashCode(), custom2.hashCode(), "HashCode for 
equal custom instances should be the same.");
+        assertNotEquals(custom1.hashCode(), customDifferent.hashCode(), 
"HashCode for different custom instances should not match.");
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index f43747d3cf2..0dc0179c6e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -2257,7 +2257,7 @@ public class TopologyTest {
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
-        topology.addSource(null, sourceName, null, null, null, sourceTopic);
+        topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, 
null, null, sourceTopic);
         final StringBuilder allSourceTopics = new 
StringBuilder(sourceTopic[0]);
         for (int i = 1; i < sourceTopic.length; ++i) {
             allSourceTopics.append(", ").append(sourceTopic[i]);
@@ -2267,7 +2267,7 @@ public class TopologyTest {
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final Pattern sourcePattern) {
-        topology.addSource(null, sourceName, null, null, null, sourcePattern);
+        topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, 
null, null, sourcePattern);
         return new InternalTopologyBuilder.Source(sourceName, null, 
sourcePattern);
     }
 
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
index 9a8034bac5a..89f461a8fea 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala.kstream
 
 import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
-import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.{AutoOffsetReset, Topology}
 import org.apache.kafka.streams.processor.TimestampExtractor
 
 object Consumed {
@@ -36,12 +36,32 @@ object Consumed {
    * @param valueSerde         the value serde to use.
    * @return a new instance of [[Consumed]]
    */
+  @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", 
"4.0.0")
   def `with`[K, V](
     timestampExtractor: TimestampExtractor,
     resetPolicy: Topology.AutoOffsetReset
   )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
     ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
 
+  /**
+   * Create an instance of [[Consumed]] with the supplied arguments. `null` 
values are acceptable.
+   *
+   * @tparam K                 key type
+   * @tparam V                 value type
+   * @param timestampExtractor the timestamp extractor to used. If `null` the 
default timestamp extractor from
+   *                           config will be used
+   * @param resetPolicy        the offset reset policy to be used. If `null` 
the default reset policy from config
+   *                           will be used
+   * @param keySerde           the key serde to use.
+   * @param valueSerde         the value serde to use.
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](
+    timestampExtractor: TimestampExtractor,
+    resetPolicy: AutoOffsetReset
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+    ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
+
   /**
    * Create an instance of [[Consumed]] with key and value Serdes.
    *
@@ -74,8 +94,22 @@ object Consumed {
    * @param resetPolicy the offset reset policy to be used. If `null` the 
default reset policy from config will be used
    * @return a new instance of [[Consumed]]
    */
+  @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", 
"4.0.0")
   def `with`[K, V](
     resetPolicy: Topology.AutoOffsetReset
   )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
     
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+  /**
+   * Create an instance of [[Consumed]] with a 
`org.apache.kafka.streams.AutoOffsetReset`.
+   *
+   * @tparam K          key type
+   * @tparam V          value type
+   * @param resetPolicy the offset reset policy to be used. If `null` the 
default reset policy from config will be used
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](
+    resetPolicy: AutoOffsetReset
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+    
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
 }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
index 0b44165164b..4656a4d12fc 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.streams.scala.kstream
 
-import org.apache.kafka.streams.Topology
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy
+import org.apache.kafka.streams.AutoOffsetReset
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
 import org.apache.kafka.streams.scala.serialization.Serdes
@@ -38,15 +39,15 @@ class ConsumedTest {
   @Test
   def testCreateConsumedWithTimestampExtractorAndResetPolicy(): Unit = {
     val timestampExtractor = new FailOnInvalidTimestamp()
-    val resetPolicy = Topology.AutoOffsetReset.LATEST
+    val resetPolicy = AutoOffsetReset.latest()
     val consumed: Consumed[String, Long] =
-      Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
+      Consumed.`with`(timestampExtractor, resetPolicy)
 
     val internalConsumed = new ConsumedInternal(consumed)
     assertEquals(Serdes.stringSerde.getClass, 
internalConsumed.keySerde.getClass)
     assertEquals(Serdes.longSerde.getClass, 
internalConsumed.valueSerde.getClass)
     assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
-    assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
+    assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, 
internalConsumed.offsetResetPolicy.offsetResetStrategy())
   }
 
   @Test
@@ -59,14 +60,15 @@ class ConsumedTest {
     assertEquals(Serdes.longSerde.getClass, 
internalConsumed.valueSerde.getClass)
     assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
   }
+
   @Test
   def testCreateConsumedWithResetPolicy(): Unit = {
-    val resetPolicy = Topology.AutoOffsetReset.LATEST
+    val resetPolicy = AutoOffsetReset.latest()
     val consumed: Consumed[String, Long] = Consumed.`with`[String, 
Long](resetPolicy)
 
     val internalConsumed = new ConsumedInternal(consumed)
     assertEquals(Serdes.stringSerde.getClass, 
internalConsumed.keySerde.getClass)
     assertEquals(Serdes.longSerde.getClass, 
internalConsumed.valueSerde.getClass)
-    assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
+    assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, 
internalConsumed.offsetResetPolicy.offsetResetStrategy())
   }
 }

Reply via email to