http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 8685e8b..d14e600 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -46,7 +46,7 @@ public interface KGroupedTable<K, V> { * the same key into a new instance of {@link KTable}. * Records with {@code null} key are ignored. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) - * that can be queried using the provided {@code storeName}. + * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to @@ -60,7 +60,7 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -68,20 +68,48 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, * '.', '_' and '-'. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII - * alphanumerics, '.', '_' and '-' + * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}. * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key */ - KTable<K, Long> count(final String storeName); + KTable<K, Long> count(final String queryableStoreName); + + /** + * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to + * the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable<K, Long> count(); /** * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to @@ -102,8 +130,8 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * String storeName = storeSupplier.name(); - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String queryableStoreName = storeSupplier.name(); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -111,13 +139,13 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param storeSupplier user defined state store supplier + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key */ @@ -130,7 +158,7 @@ public interface KGroupedTable<K, V> { * Combining implies that the type of the aggregate result is the same as the type of the input value * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) - * that can be queried using the provided {@code storeName}. + * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * <p> * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. @@ -167,7 +195,7 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -175,24 +203,80 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, * '.', '_' and '-'. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param adder a {@link Reducer} that adds a new value to the aggregate result * @param subtractor a {@link Reducer} that removed an old value from the aggregate result - * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, - * '.', '_' and '-' + * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, + * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, - final String storeName); + final String queryableStoreName); + + /** + * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the + * current aggregate and the record's value by adding the new record to the aggregate. + * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum. + * For sum, the adder and substractor would work as follows: + * <pre>{@code + * public class SumAdder implements Reducer<Integer> { + * public Integer apply(Integer currentAgg, Integer newValue) { + * return currentAgg + newValue; + * } + * } + * + * public class SumSubtractor implements Reducer<Integer> { + * public Integer apply(Integer currentAgg, Integer oldValue) { + * return currentAgg - oldValue; + * } + * } + * }</pre> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param adder a {@link Reducer} that adds a new value to the aggregate result + * @param subtractor a {@link Reducer} that removed an old value from the aggregate result + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable<K, V> reduce(final Reducer<V> adder, + final Reducer<V> subtractor); /** * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) @@ -238,8 +322,8 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * String storeName = storeSupplier.name(); - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String queryableStoreName = storeSupplier.name(); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -247,15 +331,15 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param adder a {@link Reducer} that adds a new value to the aggregate result * @param subtractor a {@link Reducer} that removed an old value from the aggregate result - * @param storeSupplier user defined state store supplier + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ @@ -319,7 +403,7 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -327,16 +411,17 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param initializer a {@link Initializer} that provides an initial aggregate result value * @param adder a {@link Aggregator} that adds a new record to the aggregate result * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result - * @param storeName the name of the underlying {@link KTable} state store + * @param queryableStoreName the name of the underlying {@link KTable} state store. + * If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator)} ()}. * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key @@ -344,7 +429,7 @@ public interface KGroupedTable<K, V> { <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, - final String storeName); + final String queryableStoreName); /** * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) @@ -352,8 +437,79 @@ public interface KGroupedTable<K, V> { * Records with {@code null} key are ignored. * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, * for example, allows the result to have a different type than the input values. + * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value + * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String) + * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) - * that can be queried using the provided {@code storeName}. + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the + * current aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value by adding the new record to the aggregate. + * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions + * like sum. + * For sum, the initializer, adder, and substractor would work as follows: + * <pre>{@code + * // in this example, LongSerde.class must be set as default value serde in StreamsConfig + * public class SumInitializer implements Initializer<Long> { + * public Long apply() { + * return 0L; + * } + * } + * + * public class SumAdder implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer newValue, Long aggregate) { + * return aggregate + newValue; + * } + * } + * + * public class SumSubstractor implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer oldValue, Long aggregate) { + * return aggregate - oldValue; + * } + * } + * }</pre> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer a {@link Initializer} that provides an initial aggregate result value + * @param adder a {@link Aggregator} that adds a new record to the aggregate result + * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result + * @param <VR> the value type of the aggregated {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> adder, + final Aggregator<? super K, ? super V, VR> subtractor); + + + /** + * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers. + * Records with {@code null} key are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, + * for example, allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * <p> * The specified {@link Initializer} is applied once directly before the first input record is processed to @@ -398,7 +554,7 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -406,10 +562,10 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, * '.', '_' and '-'. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. @@ -419,8 +575,8 @@ public interface KGroupedTable<K, V> { * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result * @param aggValueSerde aggregate value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used - * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII - * alphanumerics, '.', '_' and '-' + * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde)} ()}. * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key @@ -429,7 +585,78 @@ public interface KGroupedTable<K, V> { final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde, - final String storeName); + final String queryableStoreName); + + /** + * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers. + * Records with {@code null} key are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, + * for example, allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the + * current aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value by adding the new record to the aggregate. + * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions + * like sum. + * For sum, the initializer, adder, and substractor would work as follows: + * <pre>{@code + * public class SumInitializer implements Initializer<Long> { + * public Long apply() { + * return 0L; + * } + * } + * + * public class SumAdder implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer newValue, Long aggregate) { + * return aggregate + newValue; + * } + * } + * + * public class SumSubstractor implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer oldValue, Long aggregate) { + * return aggregate - oldValue; + * } + * } + * }</pre> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer a {@link Initializer} that provides an initial aggregate result value + * @param adder a {@link Aggregator} that adds a new record to the aggregate result + * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result + * @param aggValueSerde aggregate value serdes for materializing the aggregated table, + * if not specified the default serdes defined in the configs will be used + * @param <VR> the value type of the aggregated {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> adder, + final Aggregator<? super K, ? super V, VR> subtractor, + final Serde<VR> aggValueSerde); + /** * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) @@ -483,8 +710,8 @@ public interface KGroupedTable<K, V> { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... // counting words - * String storeName = storeSupplier.name(); - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String queryableStoreName = storeSupplier.name(); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-word"; * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -492,16 +719,16 @@ public interface KGroupedTable<K, V> { * query the value of the key on a parallel running instance of your Kafka Streams application. * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the - * provide {@code storeName}, and "-changelog" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param initializer a {@link Initializer} that provides an initial aggregate result value * @param adder a {@link Aggregator} that adds a new record to the aggregate result * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result - * @param storeSupplier user defined state store supplier + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index c361cad..0e02c8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; @@ -35,6 +34,7 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.util.Collections; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -243,7 +243,7 @@ public class KStreamBuilder extends TopologyBuilder { * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -251,7 +251,7 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -259,12 +259,103 @@ public class KStreamBuilder extends TopologyBuilder { * query the value of the key on a parallel running instance of your Kafka Streams application. * * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}. * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final String topic, - final String storeName) { - return table(null, null, null, topic, storeName); + final String queryableStoreName) { + return table(null, null, null, topic, queryableStoreName); + } + + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param topic the topic name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier) { + return table(null, null, null, topic, storeSupplier); + } + + + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * @param topic the topic name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final String topic) { + return table(null, null, null, topic, (String) null); + } + + /** + * Create a {@link KTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param topic the topic name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final String topic, + final String queryableStoreName) { + return table(offsetReset, null, null, topic, queryableStoreName); } /** @@ -276,7 +367,7 @@ public class KStreamBuilder extends TopologyBuilder { * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -284,7 +375,7 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -294,13 +385,36 @@ public class KStreamBuilder extends TopologyBuilder { * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed * offsets are available * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, - final String storeName) { - return table(offsetReset, null, null, topic, storeName); + final StateStoreSupplier<KeyValueStore> storeSupplier) { + return table(offsetReset, null, null, topic, storeSupplier); + } + + /** + * Create a {@link KTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param topic the topic name; if {@code null} an internal store name will be automatically given. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final String topic) { + return table(offsetReset, null, null, topic, (String) null); } /** @@ -312,7 +426,7 @@ public class KStreamBuilder extends TopologyBuilder { * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -320,7 +434,7 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -332,25 +446,26 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}. * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final String storeName) { - return table(null, keySerde, valSerde, topic, storeName); + final String queryableStoreName) { + return table(null, keySerde, valSerde, topic, queryableStoreName); } /** * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -358,7 +473,94 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier) { + return table(null, keySerde, valSerde, topic, storeSupplier); + } + + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic) { + return table(null, keySerde, valSerde, topic, (String) null); + } + + private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier, + final boolean isQueryable) { + final String source = newName(KStreamImpl.SOURCE_NAME); + final String name = newName(KTableImpl.SOURCE_NAME); + final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name()); + + addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), + valSerde == null ? null : valSerde.deserializer(), + topic); + addProcessor(name, processorSupplier, source); + + final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, + keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable); + + addStateStore(storeSupplier, name); + connectSourceStoreAndTopic(storeSupplier.name(), topic); + + return kTable; + } + /** + * Create a {@link KTable} for the specified topic. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) * }</pre> @@ -372,37 +574,142 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}. * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final String storeName) { - final String source = newName(KStreamImpl.SOURCE_NAME); - final String name = newName(KTableImpl.SOURCE_NAME); - final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); - - addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); - addProcessor(name, processorSupplier, source); - - final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); + final String queryableStoreName) { + final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); + final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName, + keySerde, + valSerde, + false, + Collections.<String, String>emptyMap(), + true); + return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, queryableStoreName != null); + } - // only materialize the KTable into a state store if the storeName is not null - if (storeName != null) { - final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, - keySerde, - valSerde, - false, - Collections.<String, String>emptyMap(), - true); + /** + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic) { + return table(offsetReset, keySerde, valSerde, topic, (String) null); + } + /** + * Create a {@link KTable} for the specified topic. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@link KTable} for the specified topic + */ + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier) { + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, true); + } - addStateStore(storeSupplier, name); - connectSourceStoreAndTopic(storeName, topic); - } + /** + * Create a {@link GlobalKTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); + * }</pre> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param topic the topic name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(String)} + * @return a {@link GlobalKTable} for the specified topic + */ + public <K, V> GlobalKTable<K, V> globalTable(final String topic, + final String queryableStoreName) { + return globalTable(null, null, topic, queryableStoreName); + } - return kTable; + /** + * Create a {@link GlobalKTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param topic the topic name; cannot be {@code null} + * @return a {@link GlobalKTable} for the specified topic + */ + public <K, V> GlobalKTable<K, V> globalTable(final String topic) { + return globalTable(null, null, topic, (String) null); } /** @@ -411,7 +718,7 @@ public class KStreamBuilder extends TopologyBuilder { * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -419,20 +726,33 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); * }</pre> * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} * regardless of the specified value in {@link StreamsConfig}. * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()} * @return a {@link GlobalKTable} for the specified topic */ - public <K, V> GlobalKTable<K, V> globalTable(final String topic, - final String storeName) { - return globalTable(null, null, topic, storeName); + @SuppressWarnings("unchecked") + public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String queryableStoreName) { + final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); + return doGlobalTable(keySerde, valSerde, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName, + keySerde, + valSerde, + false, + Collections.<String, String>emptyMap(), + true)); } /** @@ -441,7 +761,7 @@ public class KStreamBuilder extends TopologyBuilder { * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. + * {@code queryableStoreName}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -449,7 +769,7 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long valueForKey = localStore.get(key); * }</pre> @@ -461,31 +781,60 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be {@code null} - * @param storeName the state store name; cannot be {@code null} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link GlobalKTable} for the specified topic */ @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final String storeName) { + final StateStoreSupplier<KeyValueStore> storeSupplier) { + return doGlobalTable(keySerde, valSerde, topic, storeSupplier); + } + + private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier) { + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); final String sourceName = newName(KStreamImpl.SOURCE_NAME); final String processorName = newName(KTableImpl.SOURCE_NAME); - final KTableSource<K, V> tableSource = new KTableSource<>(storeName); + final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name()); final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); - final StateStore store = new RocksDBKeyValueStoreSupplier<>(storeName, - keySerde, - valSerde, - false, - Collections.<String, String>emptyMap(), - true).get(); + addGlobalStore(storeSupplier, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource); + return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name())); + } - addGlobalStore(store, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource); - return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeName)); + /** + * Create a {@link GlobalKTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue records} with {@code null} key will be dropped. + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal + * store name. Note that that store name may not be queriable through Interactive Queries. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @return a {@link GlobalKTable} for the specified topic + */ + @SuppressWarnings("unchecked") + public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic) { + + return globalTable(keySerde, valSerde, topic, (String) null); } /** @@ -512,4 +861,16 @@ public class KStreamBuilder extends TopologyBuilder { return prefix + String.format("%010d", index.getAndIncrement()); } + /** + * <strong>This function is only for internal usage only and should not be called.</strong> + * <p> + * Create a unique state store name. + * + * @param prefix processor name prefix + * @return a new unique name + */ + public String newStoreName(final String prefix) { + return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement()); + } + }