This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d83f09d014f KAFKA-18015: Add support for duration based offset reset
strategy to Kafka Streams (#17973)
d83f09d014f is described below
commit d83f09d014ff574784212cfdc317b5004c90c687
Author: KApolinario1120 <[email protected]>
AuthorDate: Wed Dec 11 12:47:25 2024 -0600
KAFKA-18015: Add support for duration based offset reset strategy to Kafka
Streams (#17973)
Part of KIP-1106.
Adds the public APIs to Kafka Streams, to support the the newly added
"by_duration" reset policy,
plus adds the missing "none" reset policy. Deprecates the enum
`Topology.AutoOffsetReset` and
all related methods, and replaced them with new overload using the new
`AutoOffsetReset` class.
Co-authored-by: Matthias J. Sax <[email protected]>
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>, Manikumar Reddy <[email protected]>
---
.../org/apache/kafka/streams/AutoOffsetReset.java | 103 ++++++++++
.../org/apache/kafka/streams/StreamsBuilder.java | 2 +-
.../java/org/apache/kafka/streams/Topology.java | 220 ++++++++++++++++++++-
.../streams/internals/AutoOffsetResetInternal.java | 37 ++++
.../org/apache/kafka/streams/kstream/Consumed.java | 127 ++++++++++--
.../kstream/internals/ConsumedInternal.java | 14 +-
.../kstream/internals/graph/StreamSourceNode.java | 8 +-
.../kstream/internals/graph/TableSourceNode.java | 3 +-
.../internals/InternalTopologyBuilder.java | 3 +
.../apache/kafka/streams/AutoOffsetResetTest.java | 83 ++++++++
.../org/apache/kafka/streams/TopologyTest.java | 4 +-
.../kafka/streams/scala/kstream/Consumed.scala | 36 +++-
.../kafka/streams/scala/kstream/ConsumedTest.scala | 14 +-
13 files changed, 615 insertions(+), 39 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
new file mode 100644
index 00000000000..f3f3a941d20
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link Topology#addSource(AutoOffsetReset, String, String...) adding a
source processor}
+ * or when creating {@link KStream} or {@link KTable} via {@link
StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final StrategyType offsetResetStrategy;
+ protected final Optional<Duration> duration;
+
+ private AutoOffsetReset(final StrategyType offsetResetStrategy, final
Optional<Duration> duration) {
+ this.offsetResetStrategy = offsetResetStrategy;
+ this.duration = duration;
+ }
+
+ protected AutoOffsetReset(final AutoOffsetReset autoOffsetReset) {
+ this(autoOffsetReset.offsetResetStrategy, autoOffsetReset.duration);
+ }
+
+ /**
+ * Creates an {@code AutoOffsetReset} instance representing "none".
+ *
+ * @return An {@link AutoOffsetReset} instance for no reset.
+ */
+ public static AutoOffsetReset none() {
+ return new AutoOffsetReset(StrategyType.NONE, Optional.empty());
+ }
+
+ /**
+ * Creates an {@code AutoOffsetReset} instance representing "earliest".
+ *
+ * @return An {@link AutoOffsetReset} instance for the "earliest" offset.
+ */
+ public static AutoOffsetReset earliest() {
+ return new AutoOffsetReset(StrategyType.EARLIEST, Optional.empty());
+ }
+
+ /**
+ * Creates an {@code AutoOffsetReset} instance representing "latest".
+ *
+ * @return An {@code AutoOffsetReset} instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(StrategyType.LATEST, Optional.empty());
+ }
+
+ /**
+ * Creates an {@code AutoOffsetReset} instance for the specified reset
duration.
+ *
+ * @param duration The duration to use for the offset reset; must be
non-negative.
+ * @return An {@code AutoOffsetReset} instance with the specified duration.
+ * @throws IllegalArgumentException If the duration is negative.
+ */
+ public static AutoOffsetReset byDuration(final Duration duration) {
+ if (duration.isNegative()) {
+ throw new IllegalArgumentException("Duration cannot be negative");
+ }
+ return new AutoOffsetReset(StrategyType.BY_DURATION,
Optional.of(duration));
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final AutoOffsetReset that = (AutoOffsetReset) o;
+ return offsetResetStrategy == that.offsetResetStrategy &&
duration.equals(that.duration);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = offsetResetStrategy.hashCode();
+ result = 31 * result + duration.hashCode();
+ return result;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index b9cc75b9fde..7badb3016f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -144,7 +144,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String>
topics) {
- return stream(topics, Consumed.with(null, null, null, null));
+ return stream(topics, Consumed.with(null, null));
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 35fe13faa38..320e0babf77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -73,7 +73,10 @@ public class Topology {
* Sets the {@code auto.offset.reset} configuration when
* {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
* or {@link KTable} via {@link StreamsBuilder}.
+ *
+ * @deprecated Since 4.0. Use {@link
org.apache.kafka.streams.AutoOffsetReset} instead.
*/
+ @Deprecated
public enum AutoOffsetReset {
EARLIEST, LATEST
}
@@ -130,7 +133,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that this source is
to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)}
instead.
*/
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final String... topics) {
@@ -138,6 +143,24 @@ public class Topology {
return this;
}
+ /**
+ * Adds a new source that consumes the specified topics and forwards the
records to child processor and/or sink nodes.
+ * The source will use the specified {@link
org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed
offsets are found.
+ *
+ * @param offsetReset the auto offset reset policy to use for this source
if no committed offsets are found
+ * @param name the unique name of the source used to reference this node
when {@link #addProcessor(String, ProcessorSupplier, String...) adding
processor children}
+ * @param topics the name of one or more Kafka topics that this source is
to consume
+ * @return itself
+ * @throws TopologyException if a processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final String... topics) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name, null, null,
null, topics);
+ return this;
+ }
+
/**
* Add a new source that consumes from topics matching the given pattern
* and forward the records to child processor and/or sink nodes.
@@ -152,7 +175,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka topics
that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead.
*/
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
@@ -160,6 +185,29 @@ public class Topology {
return this;
}
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value
deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
+ *
+ * @param offsetReset the auto offset reset policy value for this source
if no committed offsets found
+ * @param name the unique name of the source used to reference this node
when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding
processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics
that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final Pattern topicPattern) {
+ // TODO: mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name, null, null,
null, topicPattern);
+ return this;
+ }
+
/**
* Add a new source that consumes the named topics and forward the records
to child processor and/or sink nodes.
* The source will use the {@link
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
@@ -218,7 +266,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that
this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor,
String, String...)} instead.
*/
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final TimestampExtractor
timestampExtractor,
final String name,
@@ -227,6 +277,27 @@ public class Topology {
return this;
}
+ /**
+ * Adds a new source that consumes the specified topics with a specified
{@link TimestampExtractor}
+ * and forwards the records to child processor and/or sink nodes.
+ * The source will use the provided timestamp extractor to determine the
timestamp of each record.
+ *
+ * @param offsetReset the auto offset reset policy to use if no committed
offsets are found
+ * @param timestampExtractor the timestamp extractor to use for this source
+ * @param name the unique name of the source used to reference this node
when {@link #addProcessor(String, ProcessorSupplier, String...) adding
processor children}
+ * @param topics the name of one or more Kafka topics that this source is
to consume
+ * @return itself
+ * @throws TopologyException if a processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final TimestampExtractor
timestampExtractor,
+ final String name,
+ final String... topics) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topics);
+ return this;
+ }
+
/**
* Add a new source that consumes from topics matching the given pattern
and forward the records to child processor
* and/or sink nodes.
@@ -243,7 +314,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka
topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor,
String, Pattern)} instead.
*/
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final TimestampExtractor
timestampExtractor,
final String name,
@@ -252,6 +325,27 @@ public class Topology {
return this;
}
+ /**
+ * Adds a new source that consumes from topics matching the given pattern
with a specified {@link TimestampExtractor}
+ * and forwards the records to child processor and/or sink nodes.
+ * The source will use the provided timestamp extractor to determine the
timestamp of each record.
+ *
+ * @param offsetReset the auto offset reset policy to use if no committed
offsets are found
+ * @param timestampExtractor the timestamp extractor to use for this source
+ * @param name the unique name of the source used to reference this node
when {@link #addProcessor(String, ProcessorSupplier, String...) adding
processor children}
+ * @param topicPattern the regular expression pattern to match Kafka
topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if a processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final TimestampExtractor
timestampExtractor,
+ final String name,
+ final Pattern topicPattern) {
+ // TODO
+ //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topicPattern);
+ return this;
+ }
+
/**
* Add a new source that consumes the named topics and forwards the
records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@@ -319,8 +413,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that
this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by name
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer,
Deserializer, String...)} instead.
*/
- @SuppressWarnings("overloads")
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer<?>
keyDeserializer,
@@ -330,6 +425,34 @@ public class Topology {
return this;
}
+ /**
+ * Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all the specified topics,
so care should be taken when specifying
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this
stream if no committed offsets found
+ * @param name the unique name of the source used to
reference this node when
+ * {@link #addProcessor(String,
ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if
not specified the default
+ * key deserializer defined in the configs will
be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value
deserializer defined in the configs will be used
+ * @param topics the name of one or more Kafka topics that
this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics
have already been registered by name
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final Deserializer<?>
keyDeserializer,
+ final Deserializer<?>
valueDeserializer,
+ final String... topics) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topics);
+ return this;
+ }
+
/**
* Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor
* and/or sink nodes.
@@ -348,7 +471,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka
topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by name
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer,
Deserializer, Pattern)} instead.
*/
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer<?>
keyDeserializer,
@@ -358,6 +483,34 @@ public class Topology {
return this;
}
+ /**
+ * Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so
care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this
stream if no committed offsets found
+ * @param name the unique name of the source used to
reference this node when
+ * {@link #addProcessor(String,
ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if
not specified the default
+ * key deserializer defined in the configs will
be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value
deserializer defined in the configs will be used
+ * @param topicPattern regular expression pattern to match Kafka
topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics
have already been registered by name
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final Deserializer<?>
keyDeserializer,
+ final Deserializer<?>
valueDeserializer,
+ final Pattern topicPattern) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topicPattern);
+ return this;
+ }
+
/**
* Add a new source that consumes the named topics and forwards the
records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@@ -375,8 +528,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that
this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String,
TimestampExtractor, Deserializer, Deserializer, String...)} instead.
*/
- @SuppressWarnings("overloads")
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor
timestampExtractor,
@@ -387,6 +541,34 @@ public class Topology {
return this;
}
+ /**
+ * Add a new source that consumes the named topics and forwards the
records to child processor and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ *
+ * @param offsetReset the auto offset reset policy to use for this
stream if no committed offsets found
+ * @param name the unique name of the source used to
reference this node when
+ * {@link #addProcessor(String,
ProcessorSupplier, String...) adding processor children}.
+ * @param timestampExtractor the stateless timestamp extractor used for
this source,
+ * if not specified the default extractor
defined in the configs will be used
+ * @param keyDeserializer key deserializer used to read this source, if
not specified the default
+ * key deserializer defined in the configs will
be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value
deserializer defined in the configs will be used
+ * @param topics the name of one or more Kafka topics that
this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor
timestampExtractor,
+ final Deserializer<?>
keyDeserializer,
+ final Deserializer<?>
valueDeserializer,
+ final String... topics) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+ return this;
+ }
+
/**
* Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor
* and/or sink nodes.
@@ -407,8 +589,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka
topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics
have already been registered by name
+ * @deprecated Since 4.0. Use {@link
#addSource(org.apache.kafka.streams.AutoOffsetReset, String,
TimestampExtractor, Deserializer, Deserializer, Pattern)} instead.
*/
- @SuppressWarnings("overloads")
+ @Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor
timestampExtractor,
@@ -419,6 +602,37 @@ public class Topology {
return this;
}
+ /**
+ * Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so
care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this
stream if no committed offsets found
+ * @param name the unique name of the source used to
reference this node when
+ * {@link #addProcessor(String,
ProcessorSupplier, String...) adding processor children}.
+ * @param timestampExtractor the stateless timestamp extractor used for
this source,
+ * if not specified the default extractor
defined in the configs will be used
+ * @param keyDeserializer key deserializer used to read this source, if
not specified the default
+ * key deserializer defined in the configs will
be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value
deserializer defined in the configs will be used
+ * @param topicPattern regular expression pattern to match Kafka
topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics
have already been registered by name
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor
timestampExtractor,
+ final Deserializer<?>
keyDeserializer,
+ final Deserializer<?>
valueDeserializer,
+ final Pattern topicPattern) {
+ // TODO mjsax
+ //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+ return this;
+ }
+
/**
* Add a new sink that forwards records from upstream parent processor
and/or source nodes to the named Kafka topic.
* The sink will use the {@link
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
new file mode 100644
index 00000000000..51054ee2cae
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
+import org.apache.kafka.streams.AutoOffsetReset;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class AutoOffsetResetInternal extends AutoOffsetReset {
+
+ public AutoOffsetResetInternal(final AutoOffsetReset autoOffsetReset) {
+ super(autoOffsetReset);
+ }
+
+ public StrategyType offsetResetStrategy() {
+ return offsetResetStrategy;
+ }
+ public Optional<Duration> duration() {
+ return duration;
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index d1713ab20a1..046ff336fcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -55,30 +56,52 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
- protected Topology.AutoOffsetReset resetPolicy;
+ @Deprecated
+ protected Topology.AutoOffsetReset legacyResetPolicy;
+ protected AutoOffsetReset resetPolicy;
protected String processorName;
+ @SuppressWarnings("deprecation")
private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
- final Topology.AutoOffsetReset resetPolicy,
+ final Topology.AutoOffsetReset legacyResetPolicy,
+ final AutoOffsetReset resetPolicy,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
+ this.legacyResetPolicy = legacyResetPolicy;
this.resetPolicy = resetPolicy;
this.processorName = processorName;
}
+ /**
+ * Create an instance of {@link Consumed} from an existing instance.
+ * @param consumed the instance of {@link Consumed} to copy
+ */
protected Consumed(final Consumed<K, V> consumed) {
- this(consumed.keySerde,
- consumed.valueSerde,
- consumed.timestampExtractor,
- consumed.resetPolicy,
- consumed.processorName
+ this(
+ consumed.keySerde,
+ consumed.valueSerde,
+ consumed.timestampExtractor,
+ consumed.legacyResetPolicy,
+ consumed.resetPolicy,
+ consumed.processorName
);
}
+ @Deprecated
+ private static AutoOffsetReset convertOldToNew(final
Topology.AutoOffsetReset resetPolicy) {
+ if (resetPolicy == null) {
+ return null;
+ }
+
+ return resetPolicy ==
org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
+ ? AutoOffsetReset.earliest()
+ : AutoOffsetReset.latest();
+ }
+
/**
* Create an instance of {@link Consumed} with the supplied arguments.
{@code null} values are acceptable.
*
@@ -95,12 +118,39 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @param <V> value type
*
* @return a new instance of {@link Consumed}
+ *
+ * @deprecated Since 4.0. Use {@link #with(Serde, Serde,
TimestampExtractor, AutoOffsetReset)} instead.
*/
+ @Deprecated
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor
timestampExtractor,
final Topology.AutoOffsetReset
resetPolicy) {
- return new Consumed<>(keySerde, valueSerde, timestampExtractor,
resetPolicy, null);
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor,
resetPolicy, convertOldToNew(resetPolicy), null);
+ }
+
+ /**
+ * Create an instance of {@link Consumed} with the supplied arguments.
{@code null} values are acceptable.
+ *
+ * @param keySerde
+ * the key serde. If {@code null} the default key serde from config
will be used
+ * @param valueSerde
+ * the value serde. If {@code null} the default value serde from
config will be used
+ * @param timestampExtractor
+ * the timestamp extractor to used. If {@code null} the default
timestamp extractor from config will be used
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
+ * @return a new instance of {@link Consumed}
+ */
+ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final TimestampExtractor
timestampExtractor,
+ final AutoOffsetReset
resetPolicy) {
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor, null,
resetPolicy, null);
}
/**
@@ -118,7 +168,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
- return new Consumed<>(keySerde, valueSerde, null, null, null);
+ return new Consumed<>(keySerde, valueSerde, null, null, null, null);
}
/**
@@ -133,7 +183,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor
timestampExtractor) {
- return new Consumed<>(null, null, timestampExtractor, null, null);
+ return new Consumed<>(null, null, timestampExtractor, null, null,
null);
}
/**
@@ -146,9 +196,27 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @param <V> value type
*
* @return a new instance of {@link Consumed}
+ *
+ * @deprecated Since 4.0. Use {@link #with(AutoOffsetReset)} instead.
*/
+ @Deprecated
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset
resetPolicy) {
- return new Consumed<>(null, null, null, resetPolicy, null);
+ return new Consumed<>(null, null, null, resetPolicy,
convertOldToNew(resetPolicy), null);
+ }
+
+ /**
+ * Create an instance of {@link Consumed} with a {@link
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
+ *
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
+ * @return a new instance of {@link Consumed}
+ */
+ public static <K, V> Consumed<K, V> with(final AutoOffsetReset
resetPolicy) {
+ return new Consumed<>(null, null, null, null, resetPolicy, null);
}
/**
@@ -163,7 +231,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
- return new Consumed<>(null, null, null, null, processorName);
+ return new Consumed<>(null, null, null, null, null, processorName);
}
/**
@@ -175,7 +243,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
- return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor,
legacyResetPolicy, resetPolicy, processorName);
}
/**
@@ -187,7 +255,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
- return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor,
legacyResetPolicy, resetPolicy, processorName);
}
/**
@@ -199,7 +267,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor
timestampExtractor) {
- return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor,
legacyResetPolicy, resetPolicy, processorName);
}
/**
@@ -209,9 +277,31 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
* the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
*
* @return a new instance of {@link Consumed}
+ *
+ * @deprecated Since 4.0. Use {@link
#withOffsetResetPolicy(AutoOffsetReset)} instead.
*/
+ @Deprecated
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset
resetPolicy) {
- return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
+ return new Consumed<>(
+ keySerde,
+ valueSerde,
+ timestampExtractor,
+ resetPolicy,
+ convertOldToNew(resetPolicy),
+ processorName
+ );
+ }
+
+ /**
+ * Configure the instance of {@link Consumed} with a {@link
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
+ *
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @return a new instance of {@link Consumed}
+ */
+ public Consumed<K, V> withOffsetResetPolicy(final AutoOffsetReset
resetPolicy) {
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor, null,
resetPolicy, processorName);
}
/**
@@ -224,7 +314,7 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
*/
@Override
public Consumed<K, V> withName(final String processorName) {
- return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
+ return new Consumed<>(keySerde, valueSerde, timestampExtractor,
legacyResetPolicy, resetPolicy, processorName);
}
@Override
@@ -239,11 +329,12 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
return Objects.equals(keySerde, consumed.keySerde) &&
Objects.equals(valueSerde, consumed.valueSerde) &&
Objects.equals(timestampExtractor, consumed.timestampExtractor)
&&
+ legacyResetPolicy == consumed.legacyResetPolicy &&
resetPolicy == consumed.resetPolicy;
}
@Override
public int hashCode() {
- return Objects.hash(keySerde, valueSerde, timestampExtractor,
resetPolicy);
+ return Objects.hash(keySerde, valueSerde, timestampExtractor,
legacyResetPolicy, resetPolicy);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index 40bd53a0b8c..3f5f63c7b77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -28,11 +29,10 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
super(consumed);
}
-
public ConsumedInternal(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
- final Topology.AutoOffsetReset offsetReset) {
+ final AutoOffsetResetInternal offsetReset) {
this(Consumed.with(keySerde, valueSerde, timestampExtractor,
offsetReset));
}
@@ -60,8 +60,14 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
return timestampExtractor;
}
- public Topology.AutoOffsetReset offsetResetPolicy() {
- return resetPolicy;
+ public AutoOffsetResetInternal offsetResetPolicy() {
+ return resetPolicy == null ? null : new
AutoOffsetResetInternal(resetPolicy);
+ }
+
+ @SuppressWarnings("deprecation")
+ // TODO mjsax remove
+ public Topology.AutoOffsetReset legacyOffsetResetPolicy() {
+ return legacyResetPolicy;
}
public String name() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index 97b686eaff6..b13477c546e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -73,14 +73,16 @@ public class StreamSourceNode<K, V> extends
SourceGraphNode<K, V> {
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
if (topicPattern().isPresent()) {
- topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+ // TODO mjsax
+
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicPattern().get());
} else {
- topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+ // TODO mjsax
+
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 5e776a5c733..81b44703a2d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -96,7 +96,8 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
false
);
} else {
- topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+ // TODO mjsax
+
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index eeb076fc0cf..23bdcfde6f0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -438,6 +438,7 @@ public class InternalTopologyBuilder {
return this;
}
+ @SuppressWarnings("deprecation")
public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@@ -465,6 +466,7 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
+ @SuppressWarnings("deprecation")
public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@@ -915,6 +917,7 @@ public class InternalTopologyBuilder {
}
+ @SuppressWarnings("deprecation")
private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets,
final Topology.AutoOffsetReset
offsetReset,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
new file mode 100644
index 00000000000..2dad17cd81f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AutoOffsetResetTest {
+
+ @Test
+ void latestShouldReturnAnEmptyDuration() {
+ final AutoOffsetResetInternal latest = new
AutoOffsetResetInternal(AutoOffsetReset.latest());
+ assertTrue(latest.duration().isEmpty(), "Latest should have an empty
duration.");
+ }
+
+ @Test
+ void earliestShouldReturnAnEmptyDuration() {
+ final AutoOffsetResetInternal earliest = new
AutoOffsetResetInternal(AutoOffsetReset.earliest());
+ assertTrue(earliest.duration().isEmpty(), "Earliest should have an
empty duration.");
+ }
+
+ @Test
+ void customDurationShouldMatchExpectedValue() {
+ final Duration duration = Duration.ofSeconds(10L);
+ final AutoOffsetResetInternal custom = new
AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration));
+ assertEquals(10L, custom.duration().get().toSeconds(), "Duration
should match the specified value in milliseconds.");
+ }
+
+ @Test
+ void shouldThrowExceptionIfDurationIsNegative() {
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> AutoOffsetReset.byDuration(Duration.ofSeconds(-1)),
+ "Creating an AutoOffsetReset with a negative duration should throw
an IllegalArgumentException."
+ );
+ assertEquals("Duration cannot be negative", exception.getMessage(),
"Exception message should indicate the duration cannot be negative.");
+ }
+
+ @Test
+ void twoInstancesCreatedAtTheSameTimeWithSameOptionsShouldBeEqual() {
+ final AutoOffsetReset latest1 = AutoOffsetReset.latest();
+ final AutoOffsetReset latest2 = AutoOffsetReset.latest();
+ final AutoOffsetReset earliest1 = AutoOffsetReset.earliest();
+ final AutoOffsetReset earliest2 = AutoOffsetReset.earliest();
+ final AutoOffsetReset custom1 =
AutoOffsetReset.byDuration(Duration.ofSeconds(5));
+ final AutoOffsetReset custom2 =
AutoOffsetReset.byDuration(Duration.ofSeconds(5));
+ final AutoOffsetReset customDifferent =
AutoOffsetReset.byDuration(Duration.ofSeconds(10));
+
+ // Equals
+ assertEquals(latest1, latest2, "Two latest instances should be
equal.");
+ assertEquals(earliest1, earliest2, "Two earliest instances should be
equal.");
+ assertEquals(custom1, custom2, "Two custom instances with the same
duration should be equal.");
+ assertNotEquals(latest1, earliest1, "Latest and earliest should not be
equal.");
+ assertNotEquals(custom1, customDifferent, "Custom instances with
different durations should not be equal.");
+
+ // HashCode
+ assertEquals(latest1.hashCode(), latest2.hashCode(), "HashCode for
equal instances should be the same.");
+ assertEquals(custom1.hashCode(), custom2.hashCode(), "HashCode for
equal custom instances should be the same.");
+ assertNotEquals(custom1.hashCode(), customDifferent.hashCode(),
"HashCode for different custom instances should not match.");
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index f43747d3cf2..0dc0179c6e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -2257,7 +2257,7 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
- topology.addSource(null, sourceName, null, null, null, sourceTopic);
+ topology.addSource((Topology.AutoOffsetReset) null, sourceName, null,
null, null, sourceTopic);
final StringBuilder allSourceTopics = new
StringBuilder(sourceTopic[0]);
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
@@ -2267,7 +2267,7 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
- topology.addSource(null, sourceName, null, null, null, sourcePattern);
+ topology.addSource((Topology.AutoOffsetReset) null, sourceName, null,
null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, null,
sourcePattern);
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
index 9a8034bac5a..89f461a8fea 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
-import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.{AutoOffsetReset, Topology}
import org.apache.kafka.streams.processor.TimestampExtractor
object Consumed {
@@ -36,12 +36,32 @@ object Consumed {
* @param valueSerde the value serde to use.
* @return a new instance of [[Consumed]]
*/
+ @deprecated("Use `with` method that accepts `AutoOffsetReset` instead",
"4.0.0")
def `with`[K, V](
timestampExtractor: TimestampExtractor,
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
+ /**
+ * Create an instance of [[Consumed]] with the supplied arguments. `null`
values are acceptable.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param timestampExtractor the timestamp extractor to used. If `null` the
default timestamp extractor from
+ * config will be used
+ * @param resetPolicy the offset reset policy to be used. If `null`
the default reset policy from config
+ * will be used
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](
+ timestampExtractor: TimestampExtractor,
+ resetPolicy: AutoOffsetReset
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+ ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
+
/**
* Create an instance of [[Consumed]] with key and value Serdes.
*
@@ -74,8 +94,22 @@ object Consumed {
* @param resetPolicy the offset reset policy to be used. If `null` the
default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
+ @deprecated("Use `with` method that accepts `AutoOffsetReset` instead",
"4.0.0")
def `with`[K, V](
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+ /**
+ * Create an instance of [[Consumed]] with a
`org.apache.kafka.streams.AutoOffsetReset`.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @param resetPolicy the offset reset policy to be used. If `null` the
default reset policy from config will be used
+ * @return a new instance of [[Consumed]]
+ */
+ def `with`[K, V](
+ resetPolicy: AutoOffsetReset
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
index 0b44165164b..4656a4d12fc 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
@@ -16,7 +16,8 @@
*/
package org.apache.kafka.streams.scala.kstream
-import org.apache.kafka.streams.Topology
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy
+import org.apache.kafka.streams.AutoOffsetReset
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
import org.apache.kafka.streams.scala.serialization.Serdes
@@ -38,15 +39,15 @@ class ConsumedTest {
@Test
def testCreateConsumedWithTimestampExtractorAndResetPolicy(): Unit = {
val timestampExtractor = new FailOnInvalidTimestamp()
- val resetPolicy = Topology.AutoOffsetReset.LATEST
+ val resetPolicy = AutoOffsetReset.latest()
val consumed: Consumed[String, Long] =
- Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
+ Consumed.`with`(timestampExtractor, resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
assertEquals(Serdes.stringSerde.getClass,
internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass,
internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
- assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
+ assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST,
internalConsumed.offsetResetPolicy.offsetResetStrategy())
}
@Test
@@ -59,14 +60,15 @@ class ConsumedTest {
assertEquals(Serdes.longSerde.getClass,
internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
}
+
@Test
def testCreateConsumedWithResetPolicy(): Unit = {
- val resetPolicy = Topology.AutoOffsetReset.LATEST
+ val resetPolicy = AutoOffsetReset.latest()
val consumed: Consumed[String, Long] = Consumed.`with`[String,
Long](resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
assertEquals(Serdes.stringSerde.getClass,
internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass,
internalConsumed.valueSerde.getClass)
- assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
+ assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST,
internalConsumed.offsetResetPolicy.offsetResetStrategy())
}
}