KAFKA-4144: Allow per stream/table timestamp extractor Author: Jeyhun Karimov <[email protected]>
Reviewers: Damian Guy, Eno Thereska, Matthias J. Sax, Guozhang Wang Closes #2466 from jeyhunkarimov/KAFKA-4144 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9198467e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9198467e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9198467e Branch: refs/heads/trunk Commit: 9198467eb293435385edcef9028a59fa41a3828a Parents: a1c8e7d Author: Jeyhun Karimov <[email protected]> Authored: Fri May 12 21:38:49 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 21:38:49 2017 -0700 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 2 +- .../examples/pageview/PageViewUntypedDemo.java | 2 +- .../kafka/streams/examples/pipe/PipeDemo.java | 4 +- .../examples/wordcount/WordCountDemo.java | 4 +- .../wordcount/WordCountProcessorDemo.java | 4 +- .../org/apache/kafka/streams/KafkaStreams.java | 4 +- .../org/apache/kafka/streams/StreamsConfig.java | 117 ++++- .../kafka/streams/kstream/KGroupedTable.java | 2 +- .../kafka/streams/kstream/KStreamBuilder.java | 434 ++++++++++++++++--- .../streams/processor/TimestampExtractor.java | 1 + .../streams/processor/TopologyBuilder.java | 317 ++++++++++---- .../processor/internals/PartitionGroup.java | 9 +- .../streams/processor/internals/SourceNode.java | 14 +- .../streams/processor/internals/StreamTask.java | 8 +- .../apache/kafka/streams/StreamsConfigTest.java | 16 +- .../integration/FanoutIntegrationTest.java | 2 +- .../GlobalKTableIntegrationTest.java | 2 +- .../InternalTopicIntegrationTest.java | 8 +- .../integration/JoinIntegrationTest.java | 4 +- .../KStreamKTableJoinIntegrationTest.java | 4 +- .../KTableKTableJoinIntegrationTest.java | 4 +- .../QueryableStateIntegrationTest.java | 4 +- .../integration/ResetIntegrationTest.java | 4 +- .../streams/kstream/KStreamBuilderTest.java | 54 +++ .../internals/GlobalKTableJoinsTest.java | 2 +- .../kstream/internals/KStreamImplTest.java | 8 +- .../kafka/streams/perf/SimpleBenchmark.java | 4 +- .../streams/processor/TopologyBuilderTest.java | 64 ++- .../internals/InternalTopicManagerTest.java | 2 +- .../processor/internals/PartitionGroupTest.java | 2 +- .../internals/ProcessorTopologyTest.java | 6 +- .../processor/internals/StandbyTaskTest.java | 2 +- .../internals/StreamPartitionAssignorTest.java | 2 +- .../processor/internals/StreamTaskTest.java | 17 +- .../processor/internals/StreamThreadTest.java | 21 +- .../streams/state/KeyValueStoreTestDriver.java | 6 +- .../StreamThreadStateStoreProviderTest.java | 5 +- .../streams/tests/BrokerCompatibilityTest.java | 4 +- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- .../org/apache/kafka/test/StreamsTestUtils.java | 4 +- 40 files changed, 947 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index f235044..bae7ed6 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -83,7 +83,7 @@ public class PageViewTypedDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 39e84e8..ed05f77 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -59,7 +59,7 @@ public class PageViewUntypedDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index 62b52c0..86182a3 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -41,8 +41,8 @@ public class PipeDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 616228a..50e906f 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -49,8 +49,8 @@ public class WordCountDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 9ef24f0..f2c79d6 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -104,8 +104,8 @@ public class WordCountProcessorDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bc2a433..74f5fc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -99,8 +99,8 @@ import static org.apache.kafka.common.utils.Utils.getPort; * Map<String, Object> props = new HashMap<>(); * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - * props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - * props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); * StreamsConfig config = new StreamsConfig(props); * * KStreamBuilder builder = new KStreamBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 35e6e3d..c1fd2d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -134,9 +135,23 @@ public class StreamsConfig extends AbstractConfig { public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; + /** {@code default key.serde} */ + public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; + private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>Serde</code> interface."; + + /** {@code default timestamp.extractor} */ + public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; + private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + + /** {@code default value.serde} */ + public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; + private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>Serde</code> interface."; + /** {@code key.serde} */ + @Deprecated public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; - private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface."; + @Deprecated + private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface. This config is deprecated, use \"default.key.serde\" instead"; /** {@code metadata.max.age.ms} */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; @@ -216,12 +231,16 @@ public class StreamsConfig extends AbstractConfig { private static final String STATE_DIR_DOC = "Directory location for state store."; /** {@code timestamp.extractor} */ + @Deprecated public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; - private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + @Deprecated + private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use \"default.timestamp.extractor\" instead"; /** {@code value.serde} */ + @Deprecated public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; - private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; + @Deprecated + private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface. This config is deprecated, use \"default.value.serde\" instead"; /** {@code windowstore.changelog.additional.retention.ms} */ public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; @@ -267,24 +286,39 @@ public class StreamsConfig extends AbstractConfig { REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - FailOnInvalidTimestamp.class.getName(), + null, Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + FailOnInvalidTimestamp.class.getName(), + Importance.MEDIUM, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, Type.CLASS, DefaultPartitionGrouper.class.getName(), Importance.MEDIUM, PARTITION_GROUPER_CLASS_DOC) .define(KEY_SERDE_CLASS_CONFIG, - Type.CLASS, - Serdes.ByteArraySerde.class.getName(), - Importance.MEDIUM, - KEY_SERDE_CLASS_DOC) + Type.CLASS, + null, + Importance.MEDIUM, + KEY_SERDE_CLASS_DOC) + .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + DEFAULT_KEY_SERDE_CLASS_DOC) .define(VALUE_SERDE_CLASS_CONFIG, Type.CLASS, - Serdes.ByteArraySerde.class.getName(), + null, Importance.MEDIUM, VALUE_SERDE_CLASS_DOC) + .define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + DEFAULT_VALUE_SERDE_CLASS_DOC) .define(COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 30000, @@ -590,14 +624,20 @@ public class StreamsConfig extends AbstractConfig { /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde - * class}. + * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead. * * @return an configured instance of key Serde class */ + @Deprecated public Serde keySerde() { try { - final Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class); - serde.configure(originals(), true); + Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class); + // the default value of deprecated key serde is null + if (serde == null) { + serde = defaultKeySerde(); + } else { + serde.configure(originals(), true); + } return serde; } catch (final Exception e) { throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e); @@ -605,15 +645,37 @@ public class StreamsConfig extends AbstractConfig { } /** + * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde + * class}. + * + * @return an configured instance of key Serde class + */ + public Serde defaultKeySerde() { + try { + Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); + serde.configure(originals(), true); + return serde; + } catch (final Exception e) { + throw new StreamsException(String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e); + } + } + + /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value - * Serde class}. + * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead. * * @return an configured instance of value Serde class */ + @Deprecated public Serde valueSerde() { try { - final Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); - serde.configure(originals(), false); + Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); + // the default value of deprecated value serde is null + if (serde == null) { + serde = defaultValueSerde(); + } else { + serde.configure(originals(), false); + } return serde; } catch (final Exception e) { throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e); @@ -621,6 +683,31 @@ public class StreamsConfig extends AbstractConfig { } /** + * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value + * Serde class}. + * + * @return an configured instance of value Serde class + */ + public Serde defaultValueSerde() { + try { + Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); + serde.configure(originals(), false); + return serde; + } catch (final Exception e) { + throw new StreamsException(String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e); + } + } + + + public TimestampExtractor defaultTimestampExtractor() { + TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + if (timestampExtractor == null) { + return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + } + return timestampExtractor; + } + + /** * Override any client properties in the original configs with overrides * * @param configNames The given set of configuration names. http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index d14e600..2079860 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -353,7 +353,7 @@ public interface KGroupedTable<K, V> { * Records with {@code null} key are ignored. * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, * for example, allows the result to have a different type than the input values. - * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value + * If the result value type does not match the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String) * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 0e02c8f..fb05e4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -52,10 +53,10 @@ public class KStreamBuilder extends TopologyBuilder { /** * Create a {@link KStream} from the specified topics. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the - * {@link StreamsConfig config} are used. + * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value + * deserializers as specified in the {@link StreamsConfig config} are used. * <p> - * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * If multiple topics are specified there is no ordering guarantee for records from different topics. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case it is the user's responsibility to repartition the date before any key based operation @@ -65,14 +66,15 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(final String... topics) { - return stream(null, null, null, topics); + return stream(null, null, null, null, topics); } /** * Create a {@link KStream} from the specified topics. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * The default {@link TimestampExtractor} and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. * <p> - * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * If multiple topics are specified there is no ordering guarantee for records from different topics. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case it is the user's responsibility to repartition the date before any key based operation @@ -85,13 +87,14 @@ public class KStreamBuilder extends TopologyBuilder { */ public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics) { - return stream(offsetReset, null, null, topics); + return stream(offsetReset, null, null, null, topics); } + /** * Create a {@link KStream} from the specified topic pattern. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the - * {@link StreamsConfig config} are used. + * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value + * deserializers as specified in the {@link StreamsConfig config} are used. * <p> * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of * them and there is no ordering guarantee between records from different topics. @@ -104,12 +107,13 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for topics matching the regex pattern. */ public <K, V> KStream<K, V> stream(final Pattern topicPattern) { - return stream(null, null, null, topicPattern); + return stream(null, null, null, null, topicPattern); } /** * Create a {@link KStream} from the specified topic pattern. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * The default {@link TimestampExtractor} and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. * <p> * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of * them and there is no ordering guarantee between records from different topics. @@ -124,14 +128,15 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for topics matching the regex pattern. */ public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern) { - return stream(offsetReset, null, null, topicPattern); + return stream(offsetReset, null, null, null, topicPattern); } /** * Create a {@link KStream} from the specified topics. - * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the + * {@link StreamsConfig config} are used. * <p> - * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * If multiple topics are specified there is no ordering guarantee for records from different topics. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case it is the user's responsibility to repartition the date before any key based operation @@ -145,14 +150,15 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) { - return stream(null, keySerde, valSerde, topics); + return stream(null, null, keySerde, valSerde, topics); } /** * Create a {@link KStream} from the specified topics. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * <p> - * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * If multiple topics are specified there is no ordering guarantee for records from different topics. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case it is the user's responsibility to repartition the date before any key based operation @@ -171,9 +177,65 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) { + return stream(offsetReset, null, keySerde, valSerde, topics); + } + + + /** + * Create a {@link KStream} from the specified topics. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * <p> + * If multiple topics are specified there is no ordering guarantee for records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. + * + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to read this source {@link KStream}, if not specified the default + * serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String... topics) { + return stream(null, timestampExtractor, keySerde, valSerde, topics); + } + + + /** + * Create a {@link KStream} from the specified topics. + * <p> + * If multiple topics are specified there is no ordering guarantee for records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics + * if no valid committed offsets are available + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String... topics) { final String name = newName(KStreamImpl.SOURCE_NAME); - addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); + addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); return new KStreamImpl<>(this, name, Collections.singleton(name), false); } @@ -181,7 +243,8 @@ public class KStreamBuilder extends TopologyBuilder { /** * Create a {@link KStream} from the specified topic pattern. - * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} + * as specified in the {@link StreamsConfig config} are used. * <p> * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of * them and there is no ordering guarantee between records from different topics. @@ -200,11 +263,12 @@ public class KStreamBuilder extends TopologyBuilder { public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern) { - return stream(null, keySerde, valSerde, topicPattern); + return stream(null, null, keySerde, valSerde, topicPattern); } /** * Create a {@link KStream} from the specified topic pattern. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * <p> * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of * them and there is no ordering guarantee between records from different topics. @@ -226,20 +290,78 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern) { + return stream(offsetReset, null, keySerde, valSerde, topicPattern); + } + + /** + * Create a {@link KStream} from the specified topic pattern. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * <p> + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. + * + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. + */ + public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final Pattern topicPattern) { + return stream(null, timestampExtractor, keySerde, valSerde, topicPattern); + } + + + /** + * Create a {@link KStream} from the specified topic pattern. + * <p> + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid + * committed offsets are available + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. + */ + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final Pattern topicPattern) { final String name = newName(KStreamImpl.SOURCE_NAME); - addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); + addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); return new KStreamImpl<>(this, name, Collections.singleton(name), false); } + /** * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the - * {@link StreamsConfig config} are used. + * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and + * default key and value deserializers as specified in the {@link StreamsConfig config} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> - * Note that the specified input topics must be partitioned by key. + * Note that the specified input topic must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given @@ -264,7 +386,7 @@ public class KStreamBuilder extends TopologyBuilder { */ public <K, V> KTable<K, V> table(final String topic, final String queryableStoreName) { - return table(null, null, null, topic, queryableStoreName); + return table(null, null, null, null, topic, queryableStoreName); } /** @@ -298,7 +420,7 @@ public class KStreamBuilder extends TopologyBuilder { */ public <K, V> KTable<K, V> table(final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { - return table(null, null, null, topic, storeSupplier); + return table(null, null, null, null, topic, storeSupplier); } @@ -319,7 +441,7 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final String topic) { - return table(null, null, null, topic, (String) null); + return table(null, null, null, null, topic, (String) null); } /** @@ -355,15 +477,16 @@ public class KStreamBuilder extends TopologyBuilder { public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String queryableStoreName) { - return table(offsetReset, null, null, topic, queryableStoreName); + return table(offsetReset, null, null, null, topic, queryableStoreName); } /** * Create a {@link KTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * The default {@link TimestampExtractor} and default key and value deserializers + * as specified in the {@link StreamsConfig config} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> - * Note that the specified input topics must be partitioned by key. + * Note that the specified input topic must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given @@ -391,7 +514,7 @@ public class KStreamBuilder extends TopologyBuilder { public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { - return table(offsetReset, null, null, topic, storeSupplier); + return table(offsetReset, null, null, null, topic, storeSupplier); } /** @@ -414,15 +537,93 @@ public class KStreamBuilder extends TopologyBuilder { */ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic) { - return table(offsetReset, null, null, topic, (String) null); + return table(offsetReset, null, null, null, topic, (String) null); } + /** * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers + * as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topic must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, + * if not specified the default extractor defined in the configs will be used + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor, + final String topic, + final String storeName) { + return table(null, timestampExtractor, null, null, topic, storeName); + } + + + /** + * Create a {@link KTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topic must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final String topic, + final String storeName) { + return table(offsetReset, timestampExtractor, null, null, topic, storeName); + } + + + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} + * as specified in the {@link StreamsConfig config} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> - * Note that the specified input topics must be partitioned by key. + * Note that the specified input topic must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given @@ -453,15 +654,16 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<V> valSerde, final String topic, final String queryableStoreName) { - return table(null, keySerde, valSerde, topic, queryableStoreName); + return table(null, null, keySerde, valSerde, topic, queryableStoreName); } /** * Create a {@link KTable} for the specified topic. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> - * Note that the specified input topics must be partitioned by key. + * Note that the specified input topic must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given @@ -492,7 +694,7 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { - return table(null, keySerde, valSerde, topic, storeSupplier); + return table(null, null, keySerde, valSerde, topic, storeSupplier); } /** @@ -518,12 +720,13 @@ public class KStreamBuilder extends TopologyBuilder { public <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic) { - return table(null, keySerde, valSerde, topic, (String) null); + return table(null, null, keySerde, valSerde, topic, (String) null); } private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, + final TimestampExtractor timestampExtractor, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier, final boolean isQueryable) { @@ -531,7 +734,7 @@ public class KStreamBuilder extends TopologyBuilder { final String name = newName(KTableImpl.SOURCE_NAME); final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name()); - addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), + addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); addProcessor(name, processorSupplier, source); @@ -582,6 +785,96 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<V> valSerde, final String topic, final String queryableStoreName) { + return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName); + } + + + + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topic must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String storeName) { + return table(null, timestampExtractor, keySerde, valSerde, topic, storeName); + } + + + + /** + * Create a {@link KTable} for the specified topic. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topic must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid + * committed offsets are available + * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String queryableStoreName) { final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName, keySerde, @@ -589,7 +882,7 @@ public class KStreamBuilder extends TopologyBuilder { false, Collections.<String, String>emptyMap(), true); - return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, queryableStoreName != null); + return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null); } /** @@ -618,7 +911,7 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<K> keySerde, final Serde<V> valSerde, final String topic) { - return table(offsetReset, keySerde, valSerde, topic, (String) null); + return table(offsetReset, null, keySerde, valSerde, topic, (String) null); } /** * Create a {@link KTable} for the specified topic. @@ -654,12 +947,13 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); - return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, true); + return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true); } /** @@ -689,7 +983,7 @@ public class KStreamBuilder extends TopologyBuilder { */ public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String queryableStoreName) { - return globalTable(null, null, topic, queryableStoreName); + return globalTable(null, null, null, topic, queryableStoreName); } /** @@ -709,13 +1003,15 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link GlobalKTable} for the specified topic */ public <K, V> GlobalKTable<K, V> globalTable(final String topic) { - return globalTable(null, null, topic, (String) null); + return globalTable(null, null, null, topic, (String) null); } + /** * Create a {@link GlobalKTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. + * The default {@link TimestampExtractor} and default key and value deserializers as specified in + * the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. * <p> * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given * {@code queryableStoreName}. @@ -744,10 +1040,11 @@ public class KStreamBuilder extends TopologyBuilder { @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, + final TimestampExtractor timestampExtractor, final String topic, final String queryableStoreName) { final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); - return doGlobalTable(keySerde, valSerde, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName, + return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName, keySerde, valSerde, false, @@ -757,8 +1054,8 @@ public class KStreamBuilder extends TopologyBuilder { /** * Create a {@link GlobalKTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. * <p> * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given * {@code queryableStoreName}. @@ -789,11 +1086,44 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { - return doGlobalTable(keySerde, valSerde, topic, storeSupplier); + return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier); } + + /** + * Create a {@link GlobalKTable} for the specified topic. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); + * }</pre> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable}, + * if not specified the default extractor defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@link GlobalKTable} for the specified topic + */ + @SuppressWarnings("unchecked") private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde, final Serde<V> valSerde, + final TimestampExtractor timestampExtractor, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); @@ -805,7 +1135,7 @@ public class KStreamBuilder extends TopologyBuilder { final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); - addGlobalStore(storeSupplier, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource); + addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource); return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name())); } @@ -834,13 +1164,13 @@ public class KStreamBuilder extends TopologyBuilder { final Serde<V> valSerde, final String topic) { - return globalTable(keySerde, valSerde, topic, (String) null); + return globalTable(keySerde, valSerde, null, topic, (String) null); } /** * Create a new instance of {@link KStream} by merging the given {@link KStream}s. * <p> - * There are nor ordering guaranteed for records from different {@link KStream}s. + * There is no ordering guarantee for records from different {@link KStream}s. * * @param streams the {@link KStream}s to be merged * @return a {@link KStream} containing all records of the given streams http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 51c156a..f5e0e1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -28,6 +28,7 @@ public interface TimestampExtractor { /** * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp. * Returning a negative timestamp will cause the record not to be processed but rather silently skipped. + * The timestamp extractor implementation must be stateless. * <p> * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. * <p>
