This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6cdb8c352a8 KAFKA-18015: add byDuration auto.offset.reset to Kafka
Streams (#18115)
6cdb8c352a8 is described below
commit 6cdb8c352a8ae4e77e30f486390970aecb1d970a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 11 15:12:16 2024 -0800
KAFKA-18015: add byDuration auto.offset.reset to Kafka Streams (#18115)
Part of KIP-1106.
Adds support for "by_duration" and "none" reset strategy
to the Kafka Streams runtime.
Reviewers: Bill Bejeck <[email protected]>, Anna Sophie Blee-Goldman
<[email protected]>
---
.../internals/AutoOffsetResetStrategy.java | 4 +
gradle/spotbugs-exclude.xml | 6 +
.../FineGrainedAutoResetIntegrationTest.java | 154 ++++++++++++++++++---
.../org/apache/kafka/streams/StreamsBuilder.java | 2 +-
.../java/org/apache/kafka/streams/Topology.java | 63 +++++----
.../streams/internals/AutoOffsetResetInternal.java | 9 +-
.../org/apache/kafka/streams/kstream/Consumed.java | 4 -
.../kstream/internals/ConsumedInternal.java | 20 +--
.../kstream/internals/graph/StreamSourceNode.java | 6 +-
.../kstream/internals/graph/TableSourceNode.java | 3 +-
.../internals/InternalTopologyBuilder.java | 79 ++++++++---
.../streams/processor/internals/StreamThread.java | 144 +++++++++++++++++--
.../streams/processor/internals/TaskManager.java | 2 +-
.../processor/internals/TopologyMetadata.java | 7 +-
.../apache/kafka/streams/AutoOffsetResetTest.java | 20 ++-
.../internals/InternalStreamsBuilderTest.java | 71 ++++++++--
.../internals/InternalTopologyBuilderTest.java | 36 +++--
.../processor/internals/StreamThreadTest.java | 5 +-
.../internals/StreamsPartitionAssignorTest.java | 5 +-
19 files changed, 488 insertions(+), 152 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
index 6eecc4e09d8..a692bc45ea7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java
@@ -129,6 +129,10 @@ public class AutoOffsetResetStrategy {
return Optional.empty();
}
+ public Optional<Duration> duration() {
+ return duration;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 8b6ca9f522c..3f0f9efd165 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -271,6 +271,12 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="MS_MUTABLE_ARRAY"/>
</Match>
+ <Match>
+ <Class
name="org.apache.kafka.streams.processor.internals.TopologyMetadata"/>
+ <Method name="offsetResetStrategy"/>
+ <Bug pattern="NP_OPTIONAL_RETURN_NULL"/>
+ </Match>
+
<Match>
<!-- Suppress warnings about ignoring the return value of await.
This is done intentionally because we use other clues to determine
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 413686c2ea5..3ea17e1a098 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -57,6 +58,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -70,6 +72,9 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
+ private static final String OUTPUT_TOPIC_3 = "outputTopic_3";
+ private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
+ private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -95,6 +100,9 @@ public class FineGrainedAutoResetIntegrationTest {
TOPIC_C_2,
TOPIC_Y_2,
TOPIC_Z_2,
+ TOPIC_DURATION_1,
+ TOPIC_DURATION_1,
+ TOPIC_DURATION_3,
NOOP,
DEFAULT_OUTPUT_TOPIC,
OUTPUT_TOPIC_0,
@@ -127,10 +135,13 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String TOPIC_C_2 = "topic-C_2";
private static final String TOPIC_Y_2 = "topic-Y_2";
private static final String TOPIC_Z_2 = "topic-Z_2";
+ private static final String TOPIC_DURATION_1 = "durationTopic-1";
+ private static final String TOPIC_DURATION_2 = "durationTopic-2";
+ private static final String TOPIC_DURATION_3 = "durationTopic-3";
private static final String NOOP = "noop";
private final Serde<String> stringSerde = Serdes.String();
- private static final String STRING_SERDE_CLASSNAME =
Serdes.String().getClass().getName();
+ private static final String STRING_SERDE_CLASSNAME =
Serdes.StringSerde.class.getName();
private Properties streamsConfiguration;
private final String topic1TestMessage = "topic-1 test";
@@ -141,7 +152,7 @@ public class FineGrainedAutoResetIntegrationTest {
private final String topicZTestMessage = "topic-Z test";
@BeforeEach
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
final Properties props = new Properties();
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -196,8 +207,8 @@ public class FineGrainedAutoResetIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("topic-\\d" + topicSuffix),
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
- final KStream<String, String> pattern2Stream =
builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix),
Consumed.with(Topology.AutoOffsetReset.LATEST));
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("topic-\\d" + topicSuffix),
Consumed.with(AutoOffsetReset.earliest()));
+ final KStream<String, String> pattern2Stream =
builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix),
Consumed.with(AutoOffsetReset.latest()));
final KStream<String, String> namedTopicsStream =
builder.stream(Arrays.asList(topicY, topicZ));
pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
@@ -215,17 +226,18 @@ public class FineGrainedAutoResetIntegrationTest {
final Properties consumerConfig =
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class,
StringDeserializer.class);
- final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);
- streams.start();
-
- final List<KeyValue<String, String>> receivedKeyValues =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedReceivedValues.size());
final List<String> actualValues = new
ArrayList<>(expectedReceivedValues.size());
- for (final KeyValue<String, String> receivedKeyValue :
receivedKeyValues) {
- actualValues.add(receivedKeyValue.value);
+ try (final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration)) {
+ streams.start();
+
+ final List<KeyValue<String, String>> receivedKeyValues =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedReceivedValues.size());
+
+ for (final KeyValue<String, String> receivedKeyValue :
receivedKeyValues) {
+ actualValues.add(receivedKeyValue.value);
+ }
}
- streams.close();
Collections.sort(actualValues);
Collections.sort(expectedReceivedValues);
assertThat(actualValues, equalTo(expectedReceivedValues));
@@ -251,14 +263,108 @@ public class FineGrainedAutoResetIntegrationTest {
consumer.close();
}
+ @Test
+ public void shouldFailForResetNone() throws Exception {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+
+ final Properties localConfig = StreamsTestUtils.getStreamsConfig(
+ "testConfigAutoOffsetWithNone",
+ CLUSTER.bootstrapServers(),
+ STRING_SERDE_CLASSNAME,
+ STRING_SERDE_CLASSNAME,
+ props);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> exceptionStream = builder.stream(NOOP,
Consumed.with(AutoOffsetReset.none()));
+
+ exceptionStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde,
stringSerde));
+
+ try (final KafkaStreams streams = new KafkaStreams(builder.build(),
localConfig)) {
+ final TestingUncaughtExceptionHandler uncaughtExceptionHandler =
new TestingUncaughtExceptionHandler();
+ streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+
+ streams.start();
+
+ waitForCondition(
+ () -> uncaughtExceptionHandler.correctExceptionThrown,
+ "The expected NoOffsetForPartitionException was never thrown"
+ );
+ }
+ }
+
+ @Test
+ public void shouldResetByDuration() throws Exception {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.<String, String>stream(TOPIC_DURATION_1,
Consumed.with(AutoOffsetReset.byDuration(Duration.ofHours(6L).plus(Duration.ofMinutes(40L)))))
+ .to(OUTPUT_TOPIC_3, Produced.with(stringSerde, stringSerde));
+ builder.<String, String>stream(TOPIC_DURATION_2,
Consumed.with(AutoOffsetReset.byDuration(Duration.ofMillis(mockTime.milliseconds()).minus(Duration.ofDays(1L)))))
+ .to(OUTPUT_TOPIC_4, Produced.with(stringSerde, stringSerde));
+ builder.<String, String>stream(TOPIC_DURATION_3,
Consumed.with(AutoOffsetReset.byDuration(Duration.ZERO)))
+ .to(OUTPUT_TOPIC_5, Produced.with(stringSerde, stringSerde));
+
+ final Properties producerConfig =
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class,
StringSerializer.class);
+
+ for (int i = 0; i < 10; ++i) {
+ mockTime.sleep(Duration.ofHours(1L).toMillis());
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_1,
Collections.singletonList("" + i), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_2,
Collections.singletonList("" + i), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_3,
Collections.singletonList("" + i), producerConfig, mockTime);
+ }
+
+ final Properties consumerConfig =
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class,
StringDeserializer.class);
+
+ final List<String> expectedValues = Arrays.asList("3", "4", "5", "6",
"7", "8", "9");
+ final List<String> allExpectedValues = Arrays.asList("0", "1", "2",
"3", "4", "5", "6", "7", "8", "9");
+ final List<String> singleFinalExpectedValues = List.of("10");
+ final List<String> actualValuesOne = new
ArrayList<>(expectedValues.size());
+ final List<String> actualValuesTwo = new
ArrayList<>(allExpectedValues.size());
+ final List<String> actualValuesThree = new
ArrayList<>(singleFinalExpectedValues.size());
+
+ final MockTime streamsMockTime = new MockTime(mockTime.milliseconds()
+ Duration.ofMinutes(20).toMillis(), 0);
+ try (final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration, streamsMockTime)) {
+ streams.start();
+
+ final List<KeyValue<String, String>> receivedKeyValuesOne =
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_TOPIC_3, expectedValues.size());
+ for (final KeyValue<String, String> receivedKeyValue :
receivedKeyValuesOne) {
+ actualValuesOne.add(receivedKeyValue.value);
+ }
+
+ final List<KeyValue<String, String>> receivedKeyValuesTwo =
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_TOPIC_4, allExpectedValues.size());
+ for (final KeyValue<String, String> receivedKeyValue :
receivedKeyValuesTwo) {
+ actualValuesTwo.add(receivedKeyValue.value);
+ }
+
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_DURATION_3,
Collections.singletonList("10"), producerConfig, mockTime);
+ final List<KeyValue<String, String>> receivedKeyValuesThree =
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_TOPIC_5, singleFinalExpectedValues.size());
+ for (final KeyValue<String, String> receivedKeyValue :
receivedKeyValuesThree) {
+ actualValuesThree.add(receivedKeyValue.value);
+ }
+ }
+
+ Collections.sort(actualValuesOne);
+ Collections.sort(expectedValues);
+ assertThat(actualValuesOne, equalTo(expectedValues));
+
+ Collections.sort(actualValuesTwo);
+ Collections.sort(allExpectedValues);
+ assertThat(actualValuesTwo, equalTo(allExpectedValues));
+
+ assertThat(actualValuesThree, equalTo(singleFinalExpectedValues));
+ }
+
@Test
public void shouldThrowExceptionOverlappingPattern() {
final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the
test is for completeness
- builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+ builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(AutoOffsetReset.earliest()));
try {
- builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(Topology.AutoOffsetReset.LATEST));
+ builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(AutoOffsetReset.latest()));
builder.build();
fail("Should have thrown TopologyException");
} catch (final TopologyException expected) {
@@ -270,9 +376,9 @@ public class FineGrainedAutoResetIntegrationTest {
public void shouldThrowExceptionOverlappingTopic() {
final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the
test is for completeness
- builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+ builder.stream(Pattern.compile("topic-[A-D]_1"),
Consumed.with(AutoOffsetReset.earliest()));
try {
- builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1),
Consumed.with(Topology.AutoOffsetReset.LATEST));
+ builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1),
Consumed.with(AutoOffsetReset.latest()));
builder.build();
fail("Should have thrown TopologyException");
} catch (final TopologyException expected) {
@@ -300,15 +406,17 @@ public class FineGrainedAutoResetIntegrationTest {
exceptionStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde,
stringSerde));
- final KafkaStreams streams = new KafkaStreams(builder.build(),
localConfig);
+ try (final KafkaStreams streams = new KafkaStreams(builder.build(),
localConfig)) {
+ final TestingUncaughtExceptionHandler uncaughtExceptionHandler =
new TestingUncaughtExceptionHandler();
+ streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
- final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new
TestingUncaughtExceptionHandler();
+ streams.start();
- streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
- streams.start();
- TestUtils.waitForCondition(() ->
uncaughtExceptionHandler.correctExceptionThrown,
- "The expected NoOffsetForPartitionException was never thrown");
- streams.close();
+ waitForCondition(
+ () -> uncaughtExceptionHandler.correctExceptionThrown,
+ "The expected NoOffsetForPartitionException was never thrown"
+ );
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7badb3016f8..d6016c327fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -288,7 +288,7 @@ public class StreamsBuilder {
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic) {
- return table(topic, new ConsumedInternal<>());
+ return table(topic, Consumed.with(null, null));
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 320e0babf77..6e1ace7efa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
@@ -81,6 +82,19 @@ public class Topology {
EARLIEST, LATEST
}
+ @Deprecated
+ private static AutoOffsetResetInternal convertOldToNew(final
Topology.AutoOffsetReset resetPolicy) {
+ if (resetPolicy == null) {
+ return null;
+ }
+
+ return new AutoOffsetResetInternal(
+ resetPolicy ==
org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
+ ? org.apache.kafka.streams.AutoOffsetReset.earliest()
+ : org.apache.kafka.streams.AutoOffsetReset.latest()
+ );
+ }
+
/**
* Add a new source that consumes the named topics and forward the records
to child processor and/or sink nodes.
* The source will use the {@link
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
@@ -139,7 +153,7 @@ public class Topology {
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null,
topics);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
null, null, null, topics);
return this;
}
@@ -156,8 +170,7 @@ public class Topology {
public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final String... topics) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name, null, null,
null, topics);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
return this;
}
@@ -181,7 +194,7 @@ public class Topology {
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null,
topicPattern);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
null, null, null, topicPattern);
return this;
}
@@ -203,8 +216,7 @@ public class Topology {
public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
- // TODO: mjsax
- //internalTopologyBuilder.addSource(offsetReset, name, null, null,
null, topicPattern);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
return this;
}
@@ -273,7 +285,7 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topics);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
timestampExtractor, null, null, topics);
return this;
}
@@ -293,8 +305,7 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final String... topics) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topics);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null,
topics);
return this;
}
@@ -321,7 +332,7 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topicPattern);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
timestampExtractor, null, null, topicPattern);
return this;
}
@@ -341,8 +352,7 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final Pattern topicPattern) {
- // TODO
- //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, null, null, topicPattern);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null,
topicPattern);
return this;
}
@@ -421,7 +431,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topics);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
null, keyDeserializer, valueDeserializer, topics);
return this;
}
@@ -448,8 +458,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topics);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer,
valueDeserializer, topics);
return this;
}
@@ -479,7 +488,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
null, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
@@ -506,8 +515,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name, null,
keyDeserializer, valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer,
valueDeserializer, topicPattern);
return this;
}
@@ -537,7 +545,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
timestampExtractor, keyDeserializer, valueDeserializer, topics);
return this;
}
@@ -564,8 +572,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topics);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor,
keyDeserializer, valueDeserializer, topics);
return this;
}
@@ -598,7 +605,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name,
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
@@ -628,8 +635,7 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- // TODO mjsax
- //internalTopologyBuilder.addSource(offsetReset, name,
timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor,
keyDeserializer, valueDeserializer, topicPattern);
return this;
}
@@ -987,7 +993,14 @@ public class Topology {
final
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
storeBuilder.withLoggingDisabled();
- internalTopologyBuilder.addSource(AutoOffsetReset.EARLIEST,
sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic);
+ internalTopologyBuilder.addSource(
+ new
AutoOffsetResetInternal(org.apache.kafka.streams.AutoOffsetReset.earliest()),
+ sourceName,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topic
+ );
internalTopologyBuilder.addProcessor(processorName,
stateUpdateSupplier, sourceName);
internalTopologyBuilder.addStateStore(storeBuilder, processorName);
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
index 51054ee2cae..dc38dec304c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java
@@ -20,7 +20,6 @@ import
org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.Strat
import org.apache.kafka.streams.AutoOffsetReset;
import java.time.Duration;
-import java.util.Optional;
public class AutoOffsetResetInternal extends AutoOffsetReset {
@@ -31,7 +30,9 @@ public class AutoOffsetResetInternal extends AutoOffsetReset {
public StrategyType offsetResetStrategy() {
return offsetResetStrategy;
}
- public Optional<Duration> duration() {
- return duration;
+
+ @SuppressWarnings("all")
+ public Duration duration() {
+ return duration.get();
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 046ff336fcc..ade104d6118 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -76,10 +76,6 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
this.processorName = processorName;
}
- /**
- * Create an instance of {@link Consumed} from an existing instance.
- * @param consumed the instance of {@link Consumed} to copy
- */
protected Consumed(final Consumed<K, V> consumed) {
this(
consumed.keySerde,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index 3f5f63c7b77..9ad1721dea8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -29,17 +28,6 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
super(consumed);
}
- public ConsumedInternal(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final TimestampExtractor timestampExtractor,
- final AutoOffsetResetInternal offsetReset) {
- this(Consumed.with(keySerde, valueSerde, timestampExtractor,
offsetReset));
- }
-
- public ConsumedInternal() {
- this(Consumed.with(null, null));
- }
-
public Serde<K> keySerde() {
return keySerde;
}
@@ -64,13 +52,7 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
return resetPolicy == null ? null : new
AutoOffsetResetInternal(resetPolicy);
}
- @SuppressWarnings("deprecation")
- // TODO mjsax remove
- public Topology.AutoOffsetReset legacyOffsetResetPolicy() {
- return legacyResetPolicy;
- }
-
public String name() {
return processorName;
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index b13477c546e..d343bdb1bb5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -73,16 +73,14 @@ public class StreamSourceNode<K, V> extends
SourceGraphNode<K, V> {
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
if (topicPattern().isPresent()) {
- // TODO mjsax
-
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+ topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicPattern().get());
} else {
- // TODO mjsax
-
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+ topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 81b44703a2d..5e776a5c733 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -96,8 +96,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
false
);
} else {
- // TODO mjsax
-
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
+ topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 23bdcfde6f0..67dfa061729 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -22,10 +22,10 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -148,14 +149,22 @@ public class InternalTopologyBuilder {
// all global topics
private final Set<String> globalTopics = new HashSet<>();
+ private final Set<String> noneResetTopics = new HashSet<>();
+
private final Set<String> earliestResetTopics = new HashSet<>();
private final Set<String> latestResetTopics = new HashSet<>();
+ private final Map<String, Duration> durationResetTopics = new HashMap<>();
+
+ private final Set<Pattern> noneResetPatterns = new HashSet<>();
+
private final Set<Pattern> earliestResetPatterns = new HashSet<>();
private final Set<Pattern> latestResetPatterns = new HashSet<>();
+ private final Map<Pattern, Duration> durationResetPatterns = new
HashMap<>();
+
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
// Used to capture subscribed topics via Patterns discovered during the
partition assignment process.
@@ -333,7 +342,7 @@ public class InternalTopologyBuilder {
@Override
Source describe() {
- return new Source(name, topics.size() == 0 ? null : new
HashSet<>(topics), pattern);
+ return new Source(name, topics.isEmpty() ? null : new
HashSet<>(topics), pattern);
}
}
@@ -438,8 +447,7 @@ public class InternalTopologyBuilder {
return this;
}
- @SuppressWarnings("deprecation")
- public final void addSource(final Topology.AutoOffsetReset offsetReset,
+ public final void addSource(final AutoOffsetResetInternal offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
@@ -456,7 +464,7 @@ public class InternalTopologyBuilder {
for (final String topic : topics) {
Objects.requireNonNull(topic, "topic names cannot be null");
validateTopicNotAlreadyRegistered(topic);
- maybeAddToResetList(earliestResetTopics, latestResetTopics,
offsetReset, topic);
+ maybeAddToResetList(noneResetTopics, earliestResetTopics,
latestResetTopics, durationResetTopics, offsetReset, topic);
rawSourceTopicNames.add(topic);
}
@@ -466,8 +474,7 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
- @SuppressWarnings("deprecation")
- public final void addSource(final Topology.AutoOffsetReset offsetReset,
+ public final void addSource(final AutoOffsetResetInternal offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
@@ -486,7 +493,7 @@ public class InternalTopologyBuilder {
}
}
- maybeAddToResetList(earliestResetPatterns, latestResetPatterns,
offsetReset, topicPattern);
+ maybeAddToResetList(noneResetPatterns, earliestResetPatterns,
latestResetPatterns, durationResetPatterns, offsetReset, topicPattern);
nodeFactories.put(name, new SourceNodeFactory<>(name, null,
topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
nodeToSourcePatterns.put(name, topicPattern);
@@ -914,22 +921,28 @@ public class InternalTopologyBuilder {
Collections.unmodifiableSet(sourcePatterns)
);
}
-
}
- @SuppressWarnings("deprecation")
- private <T> void maybeAddToResetList(final Collection<T> earliestResets,
+ private <T> void maybeAddToResetList(final Collection<T> noneResets,
+ final Collection<T> earliestResets,
final Collection<T> latestResets,
- final Topology.AutoOffsetReset
offsetReset,
+ final Map<T, Duration> durationReset,
+ final AutoOffsetResetInternal
offsetReset,
final T item) {
if (offsetReset != null) {
- switch (offsetReset) {
+ switch (offsetReset.offsetResetStrategy()) {
+ case NONE:
+ noneResets.add(item);
+ break;
case EARLIEST:
earliestResets.add(item);
break;
case LATEST:
latestResets.add(item);
break;
+ case BY_DURATION:
+ durationReset.put(item, offsetReset.duration());
+ break;
default:
throw new TopologyException(String.format("Unrecognized
reset format %s", offsetReset));
}
@@ -1347,19 +1360,30 @@ public class InternalTopologyBuilder {
}
public boolean hasOffsetResetOverrides() {
- return !(earliestResetTopics.isEmpty() &&
earliestResetPatterns.isEmpty()
- && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+ return noneResetTopics.size() + noneResetPatterns.size()
+ + earliestResetTopics.size() + earliestResetPatterns.size()
+ + latestResetTopics.size() + latestResetPatterns.size()
+ + durationResetTopics.size() + durationResetPatterns.size() > 0;
}
public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
- if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+ final Optional<Duration> resetDuration;
+
+ if (maybeDecorateInternalSourceTopics(noneResetTopics).contains(topic)
||
+ noneResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
+ return AutoOffsetResetStrategy.NONE;
+ } else if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
earliestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
return AutoOffsetResetStrategy.EARLIEST;
} else if
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
latestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
return AutoOffsetResetStrategy.LATEST;
+ } else if
(maybeDecorateInternalSourceTopics(durationResetTopics.keySet()).contains(topic))
{
+ return AutoOffsetResetStrategy.fromString("by_duration:" +
durationResetTopics.get(topic).toString());
+ } else if ((resetDuration = findDuration(topic)).isPresent()) {
+ return AutoOffsetResetStrategy.fromString("by_duration:" +
resetDuration.get());
} else if (containsTopic(topic)) {
- return AutoOffsetResetStrategy.NONE;
+ return null;
} else {
throw new IllegalStateException(String.format(
"Unable to lookup offset reset strategy for the following
topic as it does not exist in the topology%s: %s",
@@ -1369,6 +1393,19 @@ public class InternalTopologyBuilder {
}
}
+ private Optional<Duration> findDuration(final String topic) {
+ final List<Duration> resetDuration =
durationResetPatterns.entrySet().stream()
+ .filter(e -> e.getKey().matcher(topic).matches())
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+
+ if (resetDuration.size() > 1) {
+ throw new IllegalStateException("Found more than one reset
duration for topic: " + topic);
+ }
+
+ return resetDuration.isEmpty() ? Optional.empty() :
resetDuration.stream().findAny();
+ }
+
/**
* @return map from state store name to full names (including application
id/topology name prefix)
* of all source topics whose processors are connected to it
@@ -1450,9 +1487,10 @@ public class InternalTopologyBuilder {
+ "applicationId hasn't been set.
Call "
+ "setApplicationId first");
}
- final String prefix = topologyConfigs == null ?
- applicationId :
-
ProcessorContextUtils.topicNamePrefix(topologyConfigs.applicationConfigs.originals(),
applicationId);
+
+ final String prefix = topologyConfigs == null
+ ? applicationId
+ :
ProcessorContextUtils.topicNamePrefix(topologyConfigs.applicationConfigs.originals(),
applicationId);
if (hasNamedTopology()) {
return prefix + "-" + topologyName + "-" + topic;
@@ -1461,7 +1499,6 @@ public class InternalTopologyBuilder {
}
}
-
void initializeSubscription() {
if (usesPatternSubscription()) {
log.debug("Found pattern subscribed source topics, initializing
consumer's subscription pattern.");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ceb2c403c3e..ab4ed4656b3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -23,6 +23,8 @@ import
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -1291,29 +1293,77 @@ public class StreamThread extends Thread implements
ProcessingThread {
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
+ final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
- final AutoOffsetResetStrategy offsetResetStrategy =
topologyMetadata.offsetResetStrategy(partition.topic());
+ final Optional<AutoOffsetResetStrategy> offsetResetStrategy =
topologyMetadata.offsetResetStrategy(partition.topic());
// This may be null if the task we are currently processing was
apart of a named topology that was just removed.
// TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata
view of named topologies in sync until final thread has acked
if (offsetResetStrategy != null) {
- if (offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST) {
- addToResetList(partition, seekToBeginning, "Setting topic
'{}' to consume from {} offset", "earliest", loggedTopics);
- } else if (offsetResetStrategy ==
AutoOffsetResetStrategy.LATEST) {
- addToResetList(partition, seekToEnd, "Setting topic '{}'
to consume from {} offset", "latest", loggedTopics);
- } else if (offsetResetStrategy ==
AutoOffsetResetStrategy.NONE) {
- final AutoOffsetResetStrategy autoOffsetResetStrategy =
AutoOffsetResetStrategy.fromString(originalReset);
- if (AutoOffsetResetStrategy.EARLIEST ==
autoOffsetResetStrategy) {
- addToResetList(partition, seekToBeginning, "No custom
setting defined for topic '{}' using original config '{}' for offset reset",
"earliest", loggedTopics);
- } else if (AutoOffsetResetStrategy.LATEST ==
autoOffsetResetStrategy) {
- addToResetList(partition, seekToEnd, "No custom
setting defined for topic '{}' using original config '{}' for offset reset",
"latest", loggedTopics);
- } else {
+ if (offsetResetStrategy.isPresent()) {
+ final AutoOffsetResetStrategy resetPolicy =
offsetResetStrategy.get();
+
+ if (resetPolicy == AutoOffsetResetStrategy.NONE) {
notReset.add(partition);
+ } else if (resetPolicy ==
AutoOffsetResetStrategy.EARLIEST) {
+ addToResetList(
+ partition,
+ seekToBeginning,
+ "Setting topic '{}' to consume from earliest
offset",
+ loggedTopics
+ );
+ } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
+ addToResetList(
+ partition,
+ seekToEnd,
+ "Setting topic '{}' to consume from latest offset",
+ loggedTopics
+ );
+ } else if (resetPolicy.type() ==
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+ addToResetList(
+ partition,
+ seekByDuration,
+ resetPolicy.duration().get(),
+ "Setting topic '{}' to consume from
by_duration:{}",
+ resetPolicy.duration().get().toString(),
+ loggedTopics
+ );
+ } else {
+ throw new IllegalStateException("Unknown reset policy
" + resetPolicy);
}
} else {
- throw new IllegalStateException("Unable to locate topic "
+ partition.topic() + " in the topology");
+ final AutoOffsetResetStrategy resetPolicy =
AutoOffsetResetStrategy.fromString(originalReset);
+
+ if (resetPolicy == AutoOffsetResetStrategy.NONE) {
+ notReset.add(partition);
+ } else if (resetPolicy ==
AutoOffsetResetStrategy.EARLIEST) {
+ addToResetList(
+ partition,
+ seekToBeginning,
+ "No custom setting defined for topic '{}' using
original config 'earliest' for offset reset",
+ loggedTopics
+ );
+ } else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
+ addToResetList(
+ partition,
+ seekToEnd,
+ "No custom setting defined for topic '{}' using
original config 'latest' for offset reset",
+ loggedTopics
+ );
+ } else if (resetPolicy.type() ==
AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
+ addToResetList(
+ partition,
+ seekByDuration,
+ resetPolicy.duration().get(),
+ "No custom setting defined for topic '{}' using
original config 'by_duration:{}' for offset reset",
+ resetPolicy.duration().get().toString(),
+ loggedTopics
+ );
+ } else {
+ throw new IllegalStateException("Unknown reset policy
" + resetPolicy);
+ }
}
}
}
@@ -1326,6 +1376,50 @@ public class StreamThread extends Thread implements
ProcessingThread {
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
}
+
+ if (!seekByDuration.isEmpty()) {
+ final long nowMs = time.milliseconds();
+ final Map<TopicPartition, Long> seekToTimestamps =
seekByDuration.entrySet().stream()
+ .map(e -> {
+ long seekMs = nowMs - e.getValue().toMillis();
+ if (seekMs < 0L) {
+ log.debug("Cannot reset offset to negative
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs,
e.getKey());
+ seekMs = 0L;
+ }
+ return Map.entry(e.getKey(), seekMs);
+ })
+ .collect(HashMap::new, (m, e) -> m.put(e.getKey(),
e.getValue()), Map::putAll);
+
+ try {
+ for (final Map.Entry<TopicPartition, OffsetAndTimestamp>
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet())
{
+ final TopicPartition partition =
partitionAndOffset.getKey();
+ final OffsetAndTimestamp seekOffset =
partitionAndOffset.getValue();
+ if (seekOffset != null) {
+ mainConsumer.seek(partition, new
OffsetAndMetadata(seekOffset.offset()));
+ } else {
+ log.debug(
+ "Cannot reset offset to non-existing timestamp
{} (larger than timestamp of last record)" +
+ " for partition {}. Seeking to end
instead.",
+ seekToTimestamps.get(partition),
+ partition
+ );
+
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+ }
+ }
+ } catch (final TimeoutException timeoutException) {
+ for (final TopicPartition partition :
seekByDuration.keySet()) {
+ final Task task = taskManager.getActiveTask(partition);
+ task.maybeInitTaskTimeoutOrThrow(now,
timeoutException);
+ stateUpdater.add(task);
+ }
+ log.debug(
+ String.format(
+ "Could not reset offset for %s due to the
following exception; will retry.",
+ seekByDuration.keySet()),
+ timeoutException
+ );
+ }
+ }
} else {
final String notResetString =
notReset.stream()
@@ -1349,14 +1443,34 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
}
- private void addToResetList(final TopicPartition partition, final
Set<TopicPartition> partitions, final String logMessage, final String
resetPolicy, final Set<String> loggedTopics) {
+ private void addToResetList(
+ final TopicPartition partition,
+ final Set<TopicPartition> partitions,
+ final String resetPolicy,
+ final Set<String> loggedTopics
+ ) {
final String topic = partition.topic();
if (loggedTopics.add(topic)) {
- log.info(logMessage, topic, resetPolicy);
+ log.info("Setting topic '{}' to consume from {} offset", topic,
resetPolicy);
}
partitions.add(partition);
}
+ private void addToResetList(
+ final TopicPartition partition,
+ final Map<TopicPartition, Duration> durationForPartitions,
+ final Duration durationTime,
+ final String logMessage,
+ final String durationString,
+ final Set<String> loggedTopics
+ ) {
+ final String topic = partition.topic();
+ if (loggedTopics.add(topic)) {
+ log.info(logMessage, topic, durationString);
+ }
+ durationForPartitions.put(partition, durationTime);
+ }
+
// This method is added for usage in tests where mocking the underlying
native call is not possible.
public boolean isThreadAlive() {
return isAlive();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f361d3dc320..6b90e354c27 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1886,7 +1886,7 @@ public class TaskManager {
}
}
- private Task getActiveTask(final TopicPartition partition) {
+ Task getActiveTask(final TopicPartition partition) {
final Task activeTask = tasks.activeTasksForInputPartition(partition);
if (activeTask == null) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 88e4ede59ad..fe04f2c4613 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -427,10 +427,10 @@ public class TopologyMetadata {
return hasNamedTopologies() ||
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
}
- public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
+ public Optional<AutoOffsetResetStrategy> offsetResetStrategy(final String
topic) {
for (final InternalTopologyBuilder builder : builders.values()) {
if (builder.containsTopic(topic)) {
- return builder.offsetResetStrategy(topic);
+ return Optional.ofNullable(builder.offsetResetStrategy(topic));
}
}
log.warn("Unable to look up offset reset strategy for topic {} " +
@@ -439,6 +439,9 @@ public class TopologyMetadata {
"persist or appear frequently.",
topic, namedTopologiesView()
);
+ // returning `null` for an Optional return type triggers spotbugs
+ // we added an exception for NP_OPTIONAL_RETURN_NULL for this method
+ // when we remove NamedTopologies, we can remove this exception
return null;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
index 2dad17cd81f..e5679e99d8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java
@@ -21,31 +21,37 @@ import
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.NoSuchElementException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
class AutoOffsetResetTest {
@Test
- void latestShouldReturnAnEmptyDuration() {
- final AutoOffsetResetInternal latest = new
AutoOffsetResetInternal(AutoOffsetReset.latest());
- assertTrue(latest.duration().isEmpty(), "Latest should have an empty
duration.");
+ void shouldThrowExceptionOnDurationForNoneReset() {
+ final AutoOffsetResetInternal none = new
AutoOffsetResetInternal(AutoOffsetReset.none());
+ assertThrows(NoSuchElementException.class, none::duration, "None
should not have a duration.");
}
@Test
- void earliestShouldReturnAnEmptyDuration() {
+ void shouldThrowExceptionOnDurationForEarliestReset() {
final AutoOffsetResetInternal earliest = new
AutoOffsetResetInternal(AutoOffsetReset.earliest());
- assertTrue(earliest.duration().isEmpty(), "Earliest should have an
empty duration.");
+ assertThrows(NoSuchElementException.class, earliest::duration,
"Earliest should not have a duration.");
+ }
+
+ @Test
+ void shouldThrowExceptionOnDurationForLastetReset() {
+ final AutoOffsetResetInternal latest = new
AutoOffsetResetInternal(AutoOffsetReset.latest());
+ assertThrows(NoSuchElementException.class, latest::duration, "Latest
should not have a duration.");
}
@Test
void customDurationShouldMatchExpectedValue() {
final Duration duration = Duration.ofSeconds(10L);
final AutoOffsetResetInternal custom = new
AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration));
- assertEquals(10L, custom.duration().get().toSeconds(), "Duration
should match the specified value in milliseconds.");
+ assertEquals(10L, custom.duration().toSeconds(), "Duration should
match the specified value in milliseconds.");
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 3df6840ff71..c761d1eda7c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -64,7 +66,6 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
-import static org.apache.kafka.streams.Topology.AutoOffsetReset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -81,7 +82,7 @@ public class InternalStreamsBuilderTest {
private static final String APP_ID = "app-id";
private final InternalStreamsBuilder builder = new
InternalStreamsBuilder(new InternalTopologyBuilder());
- private final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>();
+ private final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(null, null));
private final String storePrefix = "prefix-";
private final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized = new
MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
private final Properties props = StreamsTestUtils.getStreamsConfig();
@@ -287,10 +288,20 @@ public class InternalStreamsBuilderTest {
assertEquals(Collections.singletonList(APP_ID +
"-KSTREAM-MAP-0000000003-repartition"),
builder.internalTopologyBuilder.sourceTopicsForStore("count"));
}
+ @Test
+ public void shouldAddTopicToNoneAutoOffsetResetList() {
+ final String topicName = "topic-1";
+ final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.none()));
+ builder.stream(Collections.singleton(topicName), consumed);
+ builder.buildAndOptimizeTopology();
+
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.NONE));
+ }
+
@Test
public void shouldAddTopicToEarliestAutoOffsetResetList() {
final String topicName = "topic-1";
- final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST));
+ final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest()));
builder.stream(Collections.singleton(topicName), consumed);
builder.buildAndOptimizeTopology();
@@ -301,16 +312,35 @@ public class InternalStreamsBuilderTest {
public void shouldAddTopicToLatestAutoOffsetResetList() {
final String topicName = "topic-1";
- final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST));
+ final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest()));
builder.stream(Collections.singleton(topicName), consumed);
builder.buildAndOptimizeTopology();
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.LATEST));
}
+ @Test
+ public void shouldAddTopicToDurationAutoOffsetResetList() {
+ final String topicName = "topic-1";
+
+ final ConsumedInternal<String, String> consumed = new
ConsumedInternal<>(Consumed.with(new
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42L)))));
+ builder.stream(Collections.singleton(topicName), consumed);
+ builder.buildAndOptimizeTopology();
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).type(),
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).duration().get().toSeconds(),
equalTo(42L));
+ }
+
+ @Test
+ public void shouldAddTableToNoneAutoOffsetResetList() {
+ final String topicName = "topic-1";
+ builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.none())), materialized);
+ builder.buildAndOptimizeTopology();
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.NONE));
+ }
+
@Test
public void shouldAddTableToEarliestAutoOffsetResetList() {
final String topicName = "topic-1";
- builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), materialized);
+ builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest())), materialized);
builder.buildAndOptimizeTopology();
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.EARLIEST));
}
@@ -318,11 +348,20 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldAddTableToLatestAutoOffsetResetList() {
final String topicName = "topic-1";
- builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), materialized);
+ builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest())), materialized);
builder.buildAndOptimizeTopology();
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.LATEST));
}
+ @Test
+ public void shouldAddTableToDurationAutoOffsetResetList() {
+ final String topicName = "topic-1";
+ builder.table(topicName, new
ConsumedInternal<>(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))),
materialized);
+ builder.buildAndOptimizeTopology();
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).type(),
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName).duration().get().toSeconds(),
equalTo(42L));
+ }
+
@Test
public void shouldNotAddTableToOffsetResetLists() {
final String topicName = "topic-1";
@@ -330,7 +369,7 @@ public class InternalStreamsBuilderTest {
builder.table(topicName, consumed, materialized);
builder.buildAndOptimizeTopology();
-
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(AutoOffsetResetStrategy.NONE));
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName),
equalTo(null));
}
@Test
@@ -341,7 +380,7 @@ public class InternalStreamsBuilderTest {
builder.stream(topicPattern, consumed);
builder.buildAndOptimizeTopology();
- assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic),
equalTo(AutoOffsetResetStrategy.NONE));
+ assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic),
equalTo(null));
}
@Test
@@ -349,7 +388,7 @@ public class InternalStreamsBuilderTest {
final Pattern topicPattern = Pattern.compile("topic-\\d+");
final String topicTwo = "topic-500000";
- builder.stream(topicPattern, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)));
+ builder.stream(topicPattern, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.earliest())));
builder.buildAndOptimizeTopology();
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo),
equalTo(AutoOffsetResetStrategy.EARLIEST));
@@ -360,12 +399,24 @@ public class InternalStreamsBuilderTest {
final Pattern topicPattern = Pattern.compile("topic-\\d+");
final String topicTwo = "topic-1000000";
- builder.stream(topicPattern, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)));
+ builder.stream(topicPattern, new
ConsumedInternal<>(Consumed.with(AutoOffsetReset.latest())));
builder.buildAndOptimizeTopology();
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo),
equalTo(AutoOffsetResetStrategy.LATEST));
}
+ @Test
+ public void shouldAddRegexTopicToDurationAutoOffsetResetList() {
+ final Pattern topicPattern = Pattern.compile("topic-\\d+");
+ final String topicTwo = "topic-1000000";
+
+ builder.stream(topicPattern, new
ConsumedInternal<>(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))));
+ builder.buildAndOptimizeTopology();
+
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo).type(),
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+
assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo).duration().get().toSeconds(),
equalTo(42L));
+ }
+
@Test
public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
builder.stream(Collections.singleton("topic"), consumed);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b0afe364985..5802518cd26 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -21,13 +21,14 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
@@ -100,28 +101,43 @@ public class InternalTopologyBuilderTest {
@Test
public void shouldAddSourceWithOffsetReset() {
+ final String noneTopic = "noneTopic";
final String earliestTopic = "earliestTopic";
final String latestTopic = "latestTopic";
+ final String durationTopic = "durationTopic";
- builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null,
null, null, earliestTopic);
- builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null,
null, null, latestTopic);
+ builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()),
"source0", null, null, null, noneTopic);
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source1", null, null,
null, earliestTopic);
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null,
latestTopic);
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))),
"source3", null, null, null, durationTopic);
builder.initializeSubscription();
+ assertThat(builder.offsetResetStrategy(noneTopic),
equalTo(AutoOffsetResetStrategy.NONE));
assertThat(builder.offsetResetStrategy(earliestTopic),
equalTo(AutoOffsetResetStrategy.EARLIEST));
assertThat(builder.offsetResetStrategy(latestTopic),
equalTo(AutoOffsetResetStrategy.LATEST));
+ assertThat(builder.offsetResetStrategy(durationTopic).type(),
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+
assertThat(builder.offsetResetStrategy(durationTopic).duration().get().toSeconds(),
equalTo(42L));
}
@Test
public void shouldAddSourcePatternWithOffsetReset() {
+ final String noneTopicPattern = "none.*Topic";
final String earliestTopicPattern = "earliest.*Topic";
final String latestTopicPattern = "latest.*Topic";
+ final String durationTopicPattern = "duration.*Topic";
+
+ builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()),
"source0", null, null, null, Pattern.compile(noneTopicPattern));
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "sourc1", null, null,
null, Pattern.compile(earliestTopicPattern));
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null,
Pattern.compile(latestTopicPattern));
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))),
"source3", null, null, null, Pattern.compile(durationTopicPattern));
- builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null,
null, null, Pattern.compile(earliestTopicPattern));
- builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null,
null, null, Pattern.compile(latestTopicPattern));
builder.initializeSubscription();
+ assertThat(builder.offsetResetStrategy("noneTestTopic"),
equalTo(AutoOffsetResetStrategy.NONE));
assertThat(builder.offsetResetStrategy("earliestTestTopic"),
equalTo(AutoOffsetResetStrategy.EARLIEST));
assertThat(builder.offsetResetStrategy("latestTestTopic"),
equalTo(AutoOffsetResetStrategy.LATEST));
+ assertThat(builder.offsetResetStrategy("durationTestTopic").type(),
equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
+
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().get().toSeconds(),
equalTo(42L));
}
@Test
@@ -131,7 +147,7 @@ public class InternalTopologyBuilderTest {
assertEquals(Collections.singletonList("test-topic"),
builder.fullSourceTopicNames());
- assertThat(builder.offsetResetStrategy("test-topic"),
equalTo(AutoOffsetResetStrategy.NONE));
+ assertThat(builder.offsetResetStrategy("test-topic"), equalTo(null));
}
@Test
@@ -143,20 +159,20 @@ public class InternalTopologyBuilderTest {
assertThat(expectedPattern.pattern(),
builder.sourceTopicPatternString(), equalTo("test-.*"));
- assertThat(builder.offsetResetStrategy("test-topic"),
equalTo(AutoOffsetResetStrategy.NONE));
+ assertThat(builder.offsetResetStrategy("test-topic"), equalTo(null));
}
@Test
public void shouldNotAllowOffsetResetSourceWithoutTopics() {
- assertThrows(TopologyException.class, () ->
builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source",
+ assertThrows(TopologyException.class, () -> builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source",
null, stringSerde.deserializer(), stringSerde.deserializer()));
}
@Test
public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
- builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null,
stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source", null,
stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
try {
- builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null,
stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+ builder.addSource(new
AutoOffsetResetInternal(AutoOffsetReset.latest()), "source", null,
stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
fail("Should throw TopologyException for duplicate source name");
} catch (final TopologyException expected) { /* ok */ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8a3e6c17d8a..e48d9275b3a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -62,6 +62,7 @@ import
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
@@ -187,7 +188,7 @@ public class StreamThreadTest {
private final MockTime mockTime = new MockTime();
private final String stateDir = TestUtils.tempDirectory().getPath();
private final MockClientSupplier clientSupplier = new MockClientSupplier();
- private final ConsumedInternal<Object, Object> consumed = new
ConsumedInternal<>();
+ private final ConsumedInternal<Object, Object> consumed = new
ConsumedInternal<>(Consumed.with(null, null));
private final ChangelogReader changelogReader = new MockChangelogReader();
private StateDirectory stateDirectory = null;
private final InternalTopologyBuilder internalTopologyBuilder = new
InternalTopologyBuilder();
@@ -2087,7 +2088,7 @@ public class StreamThreadTest {
.count(Materialized.as(storeName1));
final MaterializedInternal<Object, Object, KeyValueStore<Bytes,
byte[]>> materialized
= new MaterializedInternal<>(Materialized.as(storeName2),
internalStreamsBuilder, "");
- internalStreamsBuilder.table(topic2, new ConsumedInternal<>(),
materialized);
+ internalStreamsBuilder.table(topic2, new
ConsumedInternal<>(Consumed.with(null, null)), materialized);
internalStreamsBuilder.buildAndOptimizeTopology();
restoreConsumer.updatePartitions(changelogName1,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 6cf72e91db3..592326fae87 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -2702,8 +2703,8 @@ public class StreamsPartitionAssignorTest {
final InternalStreamsBuilder streamsBuilder = new
InternalStreamsBuilder(builder);
- final KStream<String, String> inputTopic =
streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
- final KTable<String, String> inputTable =
streamsBuilder.table("topic2", new ConsumedInternal<>(), new
MaterializedInternal<>(Materialized.as("store")));
+ final KStream<String, String> inputTopic =
streamsBuilder.stream(singleton("topic1"), new
ConsumedInternal<>(Consumed.with(null, null)));
+ final KTable<String, String> inputTable =
streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null,
null)), new MaterializedInternal<>(Materialized.as("store")));
inputTopic
.groupBy(
(k, v) -> k,