KAFKA-3422: Add overloading functions without serdes in Streams DSL Also include:
1) remove streams specific configs before passing to producer and consumer to avoid warning message; 2) add `ConsumerRecord` timestamp extractor and set as the default extractor. Author: Guozhang Wang <[email protected]> Reviewers: Michael G. Noll, Ewen Cheslack-Postava Closes #1093 from guozhangwang/KConfigWarn Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d0cd766 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d0cd766 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d0cd766 Branch: refs/heads/trunk Commit: 5d0cd7667f7e584f05ab4e76ed139fbafa81e042 Parents: 0d8cbbc Author: Guozhang Wang <[email protected]> Authored: Fri Mar 18 12:39:41 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Mar 18 12:39:41 2016 -0700 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 15 +- .../examples/pageview/PageViewUntypedDemo.java | 2 +- .../examples/wordcount/WordCountDemo.java | 9 +- .../org/apache/kafka/streams/StreamsConfig.java | 49 ++++-- .../apache/kafka/streams/kstream/KStream.java | 165 +++++++++++++++---- .../apache/kafka/streams/kstream/KTable.java | 76 +++++++-- .../streams/kstream/internals/KStreamImpl.java | 83 ++++++++-- .../streams/kstream/internals/KTableImpl.java | 52 ++++-- .../kstream/internals/KTableStoreSupplier.java | 11 +- .../ConsumerRecordTimestampExtractor.java | 39 +++++ .../processor/WallclockTimestampExtractor.java | 35 ++++ .../internals/WallclockTimestampExtractor.java | 28 ---- .../apache/kafka/streams/state/StateSerdes.java | 22 +-- .../org/apache/kafka/streams/state/Stores.java | 18 +- .../kafka/streams/state/WindowStoreUtils.java | 15 +- .../internals/InMemoryKeyValueLoggedStore.java | 26 ++- .../InMemoryKeyValueStoreSupplier.java | 47 +++--- .../InMemoryLRUCacheStoreSupplier.java | 16 +- .../streams/state/internals/MemoryLRUCache.java | 25 +-- .../internals/RocksDBKeyValueStoreSupplier.java | 16 +- .../streams/state/internals/RocksDBStore.java | 27 ++- .../state/internals/RocksDBWindowStore.java | 24 ++- .../internals/RocksDBWindowStoreSupplier.java | 16 +- .../kstream/internals/KTableImplTest.java | 4 +- .../kstream/internals/KTableMapValuesTest.java | 2 +- .../streams/smoketest/SmokeTestClient.java | 18 +- .../state/internals/RocksDBWindowStoreTest.java | 28 ++-- 27 files changed, 610 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 15083b2..0385bde 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.pageview; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; @@ -100,6 +101,8 @@ public class PageViewTypedDemo { serdeProps.put("JsonPOJOClass", PageView.class); pageViewDeserializer.configure(serdeProps, false); + final Serde<PageView> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer); + final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", UserProfile.class); userProfileSerializer.configure(serdeProps, false); @@ -108,6 +111,8 @@ public class PageViewTypedDemo { serdeProps.put("JsonPOJOClass", UserProfile.class); userProfileDeserializer.configure(serdeProps, false); + final Serde<UserProfile> userProfileSerde = Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer); + final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); wPageViewByRegionSerializer.configure(serdeProps, false); @@ -116,6 +121,8 @@ public class PageViewTypedDemo { serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); wPageViewByRegionDeserializer.configure(serdeProps, false); + final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer); + final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", RegionCount.class); regionCountSerializer.configure(serdeProps, false); @@ -124,9 +131,11 @@ public class PageViewTypedDemo { serdeProps.put("JsonPOJOClass", RegionCount.class); regionCountDeserializer.configure(serdeProps, false); - KStream<String, PageView> views = builder.stream(Serdes.String(), Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer), "streams-pageview-input"); + final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer); + + KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input"); - KTable<String, UserProfile> users = builder.table(Serdes.String(), Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer), "streams-userprofile-input"); + KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input"); KStream<WindowedPageViewByRegion, RegionCount> regionCount = views .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { @@ -169,7 +178,7 @@ public class PageViewTypedDemo { }); // write to the result topic - regionCount.to("streams-pageviewstats-typed-output", Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer), Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer)); + regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 5b80f64..6f5cdf2 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -116,7 +116,7 @@ public class PageViewUntypedDemo { }); // write to the result topic - regionCount.to("streams-pageviewstats-untyped-output", jsonSerde, jsonSerde); + regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index ebd6050..e892abb 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -48,13 +48,15 @@ public class WordCountDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "streams-file-input"); + KStream<String, String> source = builder.stream("streams-file-input"); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @@ -68,9 +70,10 @@ public class WordCountDemo { return new KeyValue<String, String>(value, value); } }) - .countByKey(Serdes.String(), "Counts"); + .countByKey("Counts"); - counts.to("streams-wordcount-output", Serdes.String(), Serdes.Long()); + // need to override value serde to Long type + counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 4e989be..d4efbee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -26,10 +26,10 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; -import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor; import java.util.Map; @@ -149,7 +149,7 @@ public class StreamsConfig extends AbstractConfig { REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - WallclockTimestampExtractor.class.getName(), + ConsumerRecordTimestampExtractor.class.getName(), Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, @@ -233,12 +233,18 @@ public class StreamsConfig extends AbstractConfig { public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) { Map<String, Object> props = getBaseConsumerConfigs(); + // add client id with stream client id prefix, and group id props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); + // add configs required for stream partition assignor props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); + props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); + + if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG)); return props; } @@ -249,6 +255,7 @@ public class StreamsConfig extends AbstractConfig { // no need to set group id for a restore consumer props.remove(ConsumerConfig.GROUP_ID_CONFIG); + // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); return props; @@ -257,39 +264,49 @@ public class StreamsConfig extends AbstractConfig { private Map<String, Object> getBaseConsumerConfigs() { Map<String, Object> props = this.originals(); + // remove streams properties + removeStreamsSpecificConfigs(props); + // set consumer default property values props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - // remove properties that are not required for consumers - removeStreamsSpecificConfigs(props); - return props; } public Map<String, Object> getProducerConfigs(String clientId) { Map<String, Object> props = this.originals(); - // set producer default property values - props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + // remove consumer properties that are not required for producers + props.remove(StreamsConfig.AUTO_OFFSET_RESET_CONFIG); - // remove properties that are not required for producers + // remove streams properties removeStreamsSpecificConfigs(props); - props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + // set producer default property values + props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + + // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); return props; } private void removeStreamsSpecificConfigs(Map<String, Object> props) { - props.remove(StreamsConfig.APPLICATION_ID_CONFIG); + props.remove(StreamsConfig.POLL_MS_CONFIG); props.remove(StreamsConfig.STATE_DIR_CONFIG); - props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); - props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); - props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.remove(StreamsConfig.APPLICATION_ID_CONFIG); props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG); props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG); - props.remove(InternalConfig.STREAM_THREAD_INSTANCE); + props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); + props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG); + props.remove(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + props.remove(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); + props.remove(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG); + props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); + props.remove(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); } public Serde keySerde() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 1c78652..6f05c3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -98,8 +98,9 @@ public interface KStream<K, V> { * Sends key-value to a topic, also creates a new instance of KStream from the topic. * This is equivalent to calling to(topic) and from(topic). * - * @param topic the topic name - * @return the instance of KStream that consumes the given topic + * @param topic the topic name + * + * @return the instance of {@link KStream} that consumes the given topic */ KStream<K, V> through(String topic); @@ -107,32 +108,33 @@ public interface KStream<K, V> { * Sends key-value to a topic, also creates a new instance of KStream from the topic. * This is equivalent to calling to(topic) and from(topic). * - * @param topic the topic name - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @return the instance of KStream that consumes the given topic + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name + * + * @return the instance of {@link KStream} that consumes the given topic */ - KStream<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde); + KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); /** * Sends key-value to a topic using default serializers specified in the config. * - * @param topic the topic name + * @param topic the topic name */ void to(String topic); /** * Sends key-value to a topic. * - * @param topic the topic name - * @param keySerde key serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param keySerde value serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param topic the topic name */ - void to(String topic, Serde<K> keySerde, Serde<V> valSerde); + void to(Serde<K> keySerde, Serde<V> valSerde, String topic); /** * Applies a stateful transformation to all elements in this stream. @@ -184,6 +186,20 @@ public interface KStream<K, V> { Serde<V1> otherValueSerde); /** + * Combines values of this stream with another KStream using Windowed Inner Join. + * + * @param otherStream the instance of {@link KStream} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param windows the specification of the {@link JoinWindows} + * @param <V1> the value type of the other stream + * @param <R> the value type of the new stream + */ + <V1, R> KStream<K, R> join( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows); + + /** * Combines values of this stream with another KStream using Windowed Outer Join. * * @param otherStream the instance of KStream joined with this stream @@ -207,6 +223,20 @@ public interface KStream<K, V> { Serde<V1> otherValueSerde); /** + * Combines values of this stream with another KStream using Windowed Outer Join. + * + * @param otherStream the instance of {@link KStream} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param windows the specification of the {@link JoinWindows} + * @param <V1> the value type of the other stream + * @param <R> the value type of the new stream + */ + <V1, R> KStream<K, R> outerJoin( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows); + + /** * Combines values of this stream with another KStream using Windowed Left Join. * * @param otherStream the instance of KStream joined with this stream @@ -227,20 +257,34 @@ public interface KStream<K, V> { Serde<V1> otherValueSerde); /** + * Combines values of this stream with another KStream using Windowed Left Join. + * + * @param otherStream the instance of {@link KStream} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param windows the specification of the {@link JoinWindows} + * @param <V1> the value type of the other stream + * @param <R> the value type of the new stream + */ + <V1, R> KStream<K, R> leftJoin( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows); + + /** * Combines values of this stream with KTable using Left Join. * - * @param ktable the instance of KTable joined with this stream - * @param joiner ValueJoiner - * @param <V1> the value type of the table - * @param <V2> the value type of the new stream + * @param table the instance of {@link KTable} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param <V1> the value type of the table + * @param <V2> the value type of the new stream */ - <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner); + <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner); /** * Aggregate values of this stream by key on a window basis. * - * @param reducer the class of Reducer - * @param windows the specification of the aggregation window + * @param reducer the class of {@link Reducer} + * @param windows the specification of the aggregation {@link Windows} */ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, @@ -250,6 +294,14 @@ public interface KStream<K, V> { /** * Aggregate values of this stream by key on a window basis. * + * @param reducer the class of {@link Reducer} + * @param windows the specification of the aggregation {@link Windows} + */ + <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows); + + /** + * Aggregate values of this stream by key on a window basis. + * * @param reducer the class of Reducer */ KTable<K, V> reduceByKey(Reducer<V> reducer, @@ -260,9 +312,16 @@ public interface KStream<K, V> { /** * Aggregate values of this stream by key on a window basis. * + * @param reducer the class of {@link Reducer} + */ + KTable<K, V> reduceByKey(Reducer<V> reducer, String name); + + /** + * Aggregate values of this stream by key on a window basis. + * * @param initializer the class of Initializer * @param aggregator the class of Aggregator - * @param windows the specification of the aggregation window + * @param windows the specification of the aggregation {@link Windows} * @param <T> the value type of the aggregated table */ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, @@ -272,12 +331,25 @@ public interface KStream<K, V> { Serde<T> aggValueSerde); /** + * Aggregate values of this stream by key on a window basis. + * + * @param initializer the class of {@link Initializer} + * @param aggregator the class of {@link Aggregator} + * @param windows the specification of the aggregation {@link Windows} + * @param <T> the value type of the aggregated table + */ + <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, + Windows<W> windows); + + /** * Aggregate values of this stream by key without a window basis, and hence * return an ever updating table * - * @param initializer the class of Initializer - * @param aggregator the class of Aggregator - * @param <T> the value type of the aggregated table + * @param initializer the class of {@link Initializer} + * @param aggregator the class of {@link Aggregator} + * @param name the name of the aggregated table + * @param <T> the value type of the aggregated table */ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, @@ -286,17 +358,46 @@ public interface KStream<K, V> { String name); /** + * Aggregate values of this stream by key without a window basis, and hence + * return an ever updating table + * + * @param initializer the class of {@link Initializer} + * @param aggregator the class of {@link Aggregator} + * @param name the name of the aggregated table + * @param <T> the value type of the aggregated table + */ + <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, + String name); + + /** + * Count number of messages of this stream by key on a window basis. + * + * @param windows the specification of the aggregation {@link Windows} + */ + <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde); + + /** * Count number of messages of this stream by key on a window basis. * - * @param windows the specification of the aggregation window + * @param windows the specification of the aggregation {@link Windows} */ - <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, - Serde<K> keySerde); + <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows); /** * Count number of messages of this stream by key without a window basis, and hence * return a ever updating counting table. + * + * @param name the name of the aggregated table + */ + KTable<K, Long> countByKey(Serde<K> keySerde, String name); + + /** + * Count number of messages of this stream by key without a window basis, and hence + * return a ever updating counting table. + * + * @param name the name of the aggregated table */ - KTable<K, Long> countByKey(Serde<K> keySerde, - String name); + KTable<K, Long> countByKey(String name); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 0ae5150..997cb4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -66,14 +66,14 @@ public interface KTable<K, V> { * Sends key-value to a topic, also creates a new instance of KTable from the topic. * This is equivalent to calling to(topic) and table(topic). * - * @param topic the topic name - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name * @return the new stream that consumes the given topic */ - KTable<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde); + KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); /** * Sends key-value to a topic using default serializers specified in the config. @@ -85,13 +85,13 @@ public interface KTable<K, V> { /** * Sends key-value to a topic. * - * @param topic the topic name - * @param keySerde key serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param topic the topic name */ - void to(String topic, Serde<K> keySerde, Serde<V> valSerde); + void to(Serde<K> keySerde, Serde<V> valSerde, String topic); /** * Creates a new instance of KStream from this KTable @@ -136,22 +136,38 @@ public interface KTable<K, V> { /** * Reduce values of this table by the selected key. * - * @param addReducer the class of Reducer - * @param removeReducer the class of Reducer + * @param adder the class of Reducer + * @param subtractor the class of Reducer * @param selector the KeyValue mapper that select the aggregate key * @param name the name of the resulted table * @param <K1> the key type of the aggregated table * @param <V1> the value type of the aggregated table * @return the instance of KTable */ - <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, - Reducer<V1> removeReducer, + <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, + Reducer<V1> subtractor, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde, String name); /** + * Reduce values of this table by the selected key. + * + * @param adder the instance of {@link Reducer} for addition + * @param subtractor the instance of {@link Reducer} for subtraction + * @param selector the instance of {@link KeyValueMapper} that select the aggregate key + * @param name the name of the resulted table + * @param <K1> the key type of the aggregated table + * @param <V1> the value type of the aggregated table + * @return the instance of KTable + */ + <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, + Reducer<V1> subtractor, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + String name); + + /** * Aggregate values of this table by the selected key. * * @param initializer the class of Initializer @@ -173,6 +189,24 @@ public interface KTable<K, V> { String name); /** + * Aggregate values of this table by the selected key. + * + * @param initializer the instance of {@link Initializer} + * @param adder the instance of {@link Aggregator} for addition + * @param substractor the instance of {@link Aggregator} for subtraction + * @param selector the instance of {@link KeyValueMapper} that select the aggregate key + * @param name the name of the resulted table + * @param <K1> the key type of the aggregated table + * @param <V1> the value type of the aggregated table + * @return the instance of aggregated {@link KTable} + */ + <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, + Aggregator<K1, V1, T> adder, + Aggregator<K1, V1, T> substractor, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + String name); + + /** * Count number of records of this table by the selected key. * * @param selector the KeyValue mapper that select the aggregate key @@ -184,4 +218,14 @@ public interface KTable<K, V> { Serde<K1> keySerde, Serde<V> valueSerde, String name); + + /** + * Count number of records of this table by the selected key. + * + * @param selector the instance of {@link KeyValueMapper} that select the aggregate key + * @param name the name of the resulted table + * @param <K1> the key type of the aggregated table + * @return the instance of aggregated {@link KTable} + */ + <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b293496..567b06c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -194,27 +194,25 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public KStream<K, V> through(String topic, - Serde<K> keySerde, - Serde<V> valSerde) { - to(topic, keySerde, valSerde); + public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { + to(keySerde, valSerde, topic); - return topology.stream(keySerde, valSerde); + return topology.stream(keySerde, valSerde, topic); } @Override public KStream<K, V> through(String topic) { - return through(topic, null, null); + return through(null, null, topic); } @Override public void to(String topic) { - to(topic, null, null); + to(null, null, topic); } @SuppressWarnings("unchecked") @Override - public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) { + public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) { String name = topology.newName(SINK_NAME); StreamPartitioner<K, V> streamPartitioner = null; @@ -270,6 +268,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public <V1, R> KStream<K, R> join( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows) { + + return join(other, joiner, windows, null, null, null, false); + } + + @Override public <V1, R> KStream<K, R> outerJoin( KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, @@ -281,6 +288,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true); } + @Override + public <V1, R> KStream<K, R> outerJoin( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows) { + + return join(other, joiner, windows, null, null, null, true); + } + @SuppressWarnings("unchecked") private <V1, R> KStream<K, R> join( KStream<K, V1> other, @@ -363,6 +379,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return new KStreamImpl<>(topology, joinThisName, allSourceNodes); } + @Override + public <V1, R> KStream<K, R> leftJoin( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindows windows) { + + return leftJoin(other, joiner, windows, null, null); + } + @SuppressWarnings("unchecked") @Override public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { @@ -402,6 +427,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, + Windows<W> windows) { + + return reduceByKey(reducer, windows, null, null); + } + + @Override public KTable<K, V> reduceByKey(Reducer<V> reducer, Serde<K> keySerde, Serde<V> aggValueSerde, @@ -426,6 +458,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) { + + return reduceByKey(reducer, null, null, name); + } + + @Override public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, @@ -452,6 +490,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, + Windows<W> windows) { + + return aggregateByKey(initializer, aggregator, windows, null, null); + } + + @Override public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Serde<K> keySerde, @@ -477,6 +523,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, + String name) { + + return aggregateByKey(initializer, aggregator, null, null, name); + } + + @Override public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde) { return this.aggregateByKey( @@ -495,8 +549,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public KTable<K, Long> countByKey(Serde<K> keySerde, - String name) { + public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) { + return countByKey(windows, null); + } + + @Override + public KTable<K, Long> countByKey(Serde<K> keySerde, String name) { return this.aggregateByKey( new Initializer<Long>() { @Override @@ -511,4 +569,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } }, keySerde, Serdes.Long(), name); } + + @Override + public KTable<K, Long> countByKey(String name) { + return countByKey(null, name); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 496a476..ca1e659 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -131,27 +131,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public KTable<K, V> through(String topic, - Serde<K> keySerde, - Serde<V> valSerde) { - to(topic, keySerde, valSerde); + public KTable<K, V> through(Serde<K> keySerde, + Serde<V> valSerde, + String topic) { + to(keySerde, valSerde, topic); return topology.table(keySerde, valSerde, topic); } @Override public KTable<K, V> through(String topic) { - return through(topic, null, null); + return through(null, null, topic); } @Override public void to(String topic) { - to(topic, null, null); + to(null, null, topic); } @Override - public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) { - this.toStream().to(topic, keySerde, valSerde); + public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) { + this.toStream().to(keySerde, valSerde, topic); } @Override @@ -239,8 +239,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, - Aggregator<K1, V1, T> add, - Aggregator<K1, V1, T> remove, + Aggregator<K1, V1, T> adder, + Aggregator<K1, V1, T> subtractor, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde, @@ -259,7 +259,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove); + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); StateStoreSupplier aggregateStore = Stores.create(name) .withKeys(keySerde) @@ -287,6 +287,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override + public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, + Aggregator<K1, V1, T> adder, + Aggregator<K1, V1, T> substractor, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + String name) { + + return aggregate(initializer, adder, substractor, selector, null, null, null, name); + } + + @Override public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, Serde<K1> keySerde, Serde<V> valueSerde, @@ -318,8 +328,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, - Reducer<V1> removeReducer, + public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, String name) { + return count(selector, null, null, name); + } + + @Override + public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, + Reducer<V1> subtractor, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde, @@ -337,7 +352,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer); + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); StateStoreSupplier aggregateStore = Stores.create(name) .withKeys(keySerde) @@ -364,6 +379,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); } + @Override + public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, + Reducer<V1> subtractor, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + String name) { + + return reduce(adder, subtractor, selector, null, null, name); + } + @SuppressWarnings("unchecked") KTableValueGetterSupplier<K, V> valueGetterSupplier() { if (processorSupplier instanceof KTableSource) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java index af3c0d7..ff118da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; import org.apache.kafka.streams.state.internals.RocksDBStore; -import org.apache.kafka.streams.state.StateSerdes; /** * A KTable storage. It stores all entries in a local RocksDB database. @@ -34,15 +33,17 @@ import org.apache.kafka.streams.state.StateSerdes; public class KTableStoreSupplier<K, V> implements StateStoreSupplier { private final String name; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final Time time; protected KTableStoreSupplier(String name, Serde<K> keySerde, - Serde<V> valSerde, + Serde<V> valueSerde, Time time) { this.name = name; - this.serdes = new StateSerdes<>(name, keySerde, valSerde); + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.time = time; } @@ -51,7 +52,7 @@ public class KTableStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), "rocksdb-state", time); + return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde), "rocksdb-state", time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java new file mode 100644 index 0000000..61b1c98 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). + * + * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved + * via this timestamp extractor. + * + * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide + * <i>event-time</i> semantics. + * + * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. + */ +public class ConsumerRecordTimestampExtractor implements TimestampExtractor { + @Override + public long extract(ConsumerRecord<Object, Object> record) { + return record.timestamp(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java new file mode 100644 index 0000000..81821ce --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}. + * + * Using this extractor effectively provides <i>processing-time</i> semantics. + * + * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with + * built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details). + */ +public class WallclockTimestampExtractor implements TimestampExtractor { + @Override + public long extract(ConsumerRecord<Object, Object> record) { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java deleted file mode 100644 index 60b3b96..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; - -public class WallclockTimestampExtractor implements TimestampExtractor { - @Override - public long extract(ConsumerRecord<Object, Object> record) { - return System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index 1a41a16..9daac98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -33,7 +33,7 @@ public final class StateSerdes<K, V> { return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); } - private final String topic; + private final String stateName; private final Serde<K> keySerde; private final Serde<V> valueSerde; @@ -43,15 +43,15 @@ public final class StateSerdes<K, V> { * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not * need to provide the topic name any more. * - * @param topic the name of the topic - * @param keySerde the serde for keys; cannot be null - * @param valueSerde the serde for values; cannot be null + * @param stateName the name of the state + * @param keySerde the serde for keys; cannot be null + * @param valueSerde the serde for values; cannot be null */ @SuppressWarnings("unchecked") - public StateSerdes(String topic, + public StateSerdes(String stateName, Serde<K> keySerde, Serde<V> valueSerde) { - this.topic = topic; + this.stateName = stateName; if (keySerde == null) throw new IllegalArgumentException("key serde cannot be null"); @@ -87,22 +87,22 @@ public final class StateSerdes<K, V> { } public String topic() { - return topic; + return stateName; } public K keyFrom(byte[] rawKey) { - return keySerde.deserializer().deserialize(topic, rawKey); + return keySerde.deserializer().deserialize(stateName, rawKey); } public V valueFrom(byte[] rawValue) { - return valueSerde.deserializer().deserialize(topic, rawValue); + return valueSerde.deserializer().deserialize(stateName, rawValue); } public byte[] rawKey(K key) { - return keySerde.serializer().serialize(topic, key); + return keySerde.serializer().serialize(stateName, key); } public byte[] rawValue(V value) { - return valueSerde.serializer().serialize(topic, value); + return valueSerde.serializer().serialize(stateName, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 33df13f..4e28187 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -44,8 +44,6 @@ public class Stores { return new ValueFactory<K>() { @Override public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) { - final StateSerdes<K, V> serdes = - new StateSerdes<>(name, keySerde, valueSerde); return new KeyValueFactory<K, V>() { @Override public InMemoryKeyValueFactory<K, V> inMemory() { @@ -62,9 +60,9 @@ public class Stores { @Override public StateStoreSupplier build() { if (capacity < Integer.MAX_VALUE) { - return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null); + return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde); } - return new InMemoryKeyValueStoreSupplier<>(name, serdes, null); + return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde); } }; } @@ -88,10 +86,10 @@ public class Stores { @Override public StateStoreSupplier build() { if (numSegments > 0) { - return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, serdes, null); + return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde); } - return new RocksDBKeyValueStoreSupplier<>(name, serdes, null); + return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde); } }; } @@ -170,8 +168,8 @@ public class Stores { /** * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys. * - * @param keySerde the serialization factory for keys; may not be null - * @return the interface used to specify the type of values; never null + * @param keySerde the serialization factory for keys; may be null + * @return the interface used to specify the type of values; never null */ public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde); } @@ -249,8 +247,8 @@ public class Stores { /** * Use the specified serializer and deserializer for the values. * - * @param valueSerde the serialization factory for values; may not be null - * @return the interface used to specify the remaining key-value store options; never null + * @param valueSerde the serialization factory for values; may be null + * @return the interface used to specify the remaining key-value store options; never null */ public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java index 66e1338..fdf3269 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java @@ -19,13 +19,22 @@ package org.apache.kafka.streams.state; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; + import java.nio.ByteBuffer; public class WindowStoreUtils { - public static final int TIMESTAMP_SIZE = 8; - public static final int SEQNUM_SIZE = 4; - public static final StateSerdes<byte[], byte[]> INNER_SERDES = StateSerdes.withBuiltinTypes("", byte[].class, byte[].class); + private static final int SEQNUM_SIZE = 4; + private static final int TIMESTAMP_SIZE = 8; + + /** Inner byte array serde used for segments */ + public static final Serde<byte[]> INNER_SERDE = Serdes.ByteArray(); + + /** Inner byte array state serde used for segments */ + public static final StateSerdes<byte[], byte[]> INNER_SERDES = new StateSerdes<>("", INNER_SERDE, INNER_SERDE); + @SuppressWarnings("unchecked") public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0]; http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index 32116dd..efcdac7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -29,16 +31,19 @@ import java.util.List; public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { private final KeyValueStore<K, V> inner; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final String storeName; + private StateSerdes<K, V> serdes; private StoreChangeLogger<K, V> changeLogger; private StoreChangeLogger.ValueGetter<K, V> getter; - public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final StateSerdes<K, V> serdes) { + public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) { this.storeName = storeName; this.inner = inner; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; } @Override @@ -47,9 +52,24 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { } @Override + @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { + // construct the serde + this.serdes = new StateSerdes<>(storeName, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes); + context.register(root, true, new StateRestoreCallback() { + @Override + public void restore(byte[] key, byte[] value) { + + // directly call inner functions so that the operation is not logged + inner.put(serdes.keyFrom(key), serdes.valueFrom(value)); + } + }); + inner.init(context, root); this.getter = new StoreChangeLogger.ValueGetter<K, V>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 4054d68..3a5819c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -17,15 +17,14 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.Iterator; import java.util.List; @@ -45,12 +44,18 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { private final String name; private final Time time; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; - public InMemoryKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) { + public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) { + this(name, keySerde, valueSerde, null); + } + + public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) { this.name = name; this.time = time; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; } public String name() { @@ -58,28 +63,24 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name).enableLogging(serdes), "in-memory-state", time); + return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time); } private static class MemoryStore<K, V> implements KeyValueStore<K, V> { - private final String name; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final NavigableMap<K, V> map; - private boolean loggingEnabled = false; - private StateSerdes<K, V> serdes = null; - - public MemoryStore(String name) { - super(); + public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde) { this.name = name; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.map = new TreeMap<>(); } - public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) { - this.loggingEnabled = true; - this.serdes = serdes; - - return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes); + public KeyValueStore<K, V> enableLogging() { + return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde); } @Override @@ -88,17 +89,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } @Override + @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - if (loggingEnabled) { - context.register(root, true, new StateRestoreCallback() { - - @Override - public void restore(byte[] key, byte[] value) { - put(serdes.keyFrom(key), serdes.valueFrom(value)); - } - }); - - } + // do nothing } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index 1c2241f..4a4fa5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.StateSerdes; /** * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. @@ -32,13 +32,19 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { private final String name; private final int capacity; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final Time time; - public InMemoryLRUCacheStoreSupplier(String name, int capacity, StateSerdes<K, V> serdes, Time time) { + public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde) { + this(name, capacity, keySerde, valueSerde, null); + } + + public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time) { this.name = name; this.capacity = capacity; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.time = time; } @@ -49,7 +55,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { @SuppressWarnings("unchecked") public StateStore get() { final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity); - final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(serdes); + final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(keySerde, valueSerde); final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time); cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index a5aaa06..a859bd2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -16,13 +16,12 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.HashSet; import java.util.LinkedHashMap; @@ -42,9 +41,6 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { protected EldestEntryRemovalListener<K, V> listener; - private boolean loggingEnabled = false; - private StateSerdes<K, V> serdes = null; - // this is used for extended MemoryNavigableLRUCache only public MemoryLRUCache() {} @@ -69,11 +65,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { }; } - public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) { - this.loggingEnabled = true; - this.serdes = serdes; - - return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes); + public KeyValueStore<K, V> enableLogging(Serde<K> keySerde, Serde<V> valueSerde) { + return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde); } public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) { @@ -88,17 +81,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override + @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - if (loggingEnabled) { - context.register(root, true, new StateRestoreCallback() { - - @Override - public void restore(byte[] key, byte[] value) { - put(serdes.keyFrom(key), serdes.valueFrom(value)); - } - }); - - } + // do nothing } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index ec10c3f..af98733 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -17,10 +17,10 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.StateSerdes; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -33,12 +33,18 @@ import org.apache.kafka.streams.state.StateSerdes; public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { private final String name; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final Time time; - public RocksDBKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) { + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) { + this(name, keySerde, valueSerde, null); + } + + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) { this.name = name; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.time = time; } @@ -47,6 +53,6 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes).enableLogging(), "rocksdb-state", time); + return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde).enableLogging(), "rocksdb-state", time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3045856..b206f37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; @@ -67,7 +68,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final WriteOptions wOptions; private final FlushOptions fOptions; - private ProcessorContext context; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + private StateSerdes<K, V> serdes; protected File dbDir; private RocksDB db; @@ -92,14 +95,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return this; } - public RocksDBStore(String name, StateSerdes<K, V> serdes) { - this(name, DB_FILE_DIR, serdes); + public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) { + this(name, DB_FILE_DIR, keySerde, valueSerde); } - public RocksDBStore(String name, String parentDir, StateSerdes<K, V> serdes) { + public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) { this.name = name; this.parentDir = parentDir; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; // initialize the rocksdb options BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); @@ -136,15 +140,20 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } } + @SuppressWarnings("unchecked") public void openDB(ProcessorContext context) { - this.context = context; - this.dbDir = new File(new File(this.context.stateDir(), parentDir), this.name); + // we need to construct the serde while opening DB since + // it is also triggered by windowed DB segments without initialization + this.serdes = new StateSerdes<>(name, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + + this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); this.db = openDB(this.dbDir, this.options, TTL_SECONDS); } - @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - // first open the DB dir + // open the DB dir openDB(context); this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 61c2e5e..4c6a229 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -51,7 +52,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public final long id; Segment(String segmentName, String windowName, long id) { - super(segmentName, windowName, WindowStoreUtils.INNER_SERDES); + super(segmentName, windowName, WindowStoreUtils.INNER_SERDE, WindowStoreUtils.INNER_SERDE); this.id = id; } @@ -114,7 +115,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private final long segmentInterval; private final boolean retainDuplicates; private final Segment[] segments; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final SimpleDateFormat formatter; private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter; @@ -122,17 +124,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private int seqnum = 0; private long currentSegmentId = -1L; + private StateSerdes<K, V> serdes; + private boolean loggingEnabled = false; private StoreChangeLogger<byte[], byte[]> changeLogger = null; - public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes) { + public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) { this.name = name; // The segment interval must be greater than MIN_SEGMENT_INTERVAL this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); this.segments = new Segment[numSegments]; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.retainDuplicates = retainDuplicates; @@ -159,13 +164,18 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } @Override + @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { this.context = context; + // construct the serde + this.serdes = new StateSerdes<>(name, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + openExistingSegments(); - this.changeLogger = this.loggingEnabled ? - new RawStoreChangeLogger(name, context) : null; + this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; // register and possibly restore the state from the logs context.register(root, loggingEnabled, new StateRestoreCallback() { @@ -202,7 +212,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { dir.mkdir(); } } catch (Exception ex) { - + // ignore } }
