http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 290142b..e6219c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -24,7 +24,10 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; /** @@ -47,7 +50,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; * final KafkaStreams streams = ...; * streams.start() * ... - * final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable + * final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable * ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore()); * view.get(key); *}</pre> @@ -87,6 +90,79 @@ public interface KTable<K, V> { KTable<K, V> filter(final Predicate<? super K, ? super V> predicate); /** + * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given + * predicate. + * All records that do not satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + * <p> + * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * is forwarded. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // filtering words + * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore()); + * K key = "some-word"; + * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}. + * @return a {@code KTable} that contains only those records that satisfy the given predicate + * @see #filterNot(Predicate) + */ + KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); + + /** + * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given + * predicate. + * All records that do not satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + * <p> + * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * is forwarded. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // filtering words + * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore()); + * K key = "some-word"; + * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@code KTable} that contains only those records that satisfy the given predicate + * @see #filterNot(Predicate) + */ + KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); + + /** * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the * given predicate. * All records that <em>do</em> satisfy the predicate are dropped. @@ -109,6 +185,78 @@ public interface KTable<K, V> { KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate); /** + * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the + * given predicate. + * All records that <em>do</em> satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + * <p> + * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is + * forwarded. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // filtering words + * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore()); + * K key = "some-word"; + * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * @param predicate a filter {@link Predicate} that is applied to each record + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate + * @see #filter(Predicate) + */ + KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); + + /** + * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the + * given predicate. + * All records that <em>do</em> satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + * <p> + * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is + * forwarded. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // filtering words + * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore()); + * K key = "some-word"; + * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * @param predicate a filter {@link Predicate} that is applied to each record + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}. + * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate + * @see #filter(Predicate) + */ + KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); + + + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possible new type)in the new {@code KTable}. * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and @@ -144,6 +292,97 @@ public interface KTable<K, V> { /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possible new type)in the new {@code KTable}. + * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and + * computes a new value for it, resulting in an update record for the result {@code KTable}. + * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. + * This is a stateless record-by-record operation. + * <p> + * The example below counts the number of token of the value string. + * <pre>{@code + * KTable<String, String> inputTable = builder.table("topic"); + * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { + * Integer apply(String value) { + * return value.split(" ").length; + * } + * }); + * }</pre> + * <p> + * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * <p> + * This operation preserves data co-location with respect to the key. + * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + * <p> + * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapper} that computes a new output value + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#mapValues(ValueMapper)}. + * @param valueSerde serializer for new value type + * @param <VR> the value type of the result {@code KTable} + * + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName); + + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possible new type)in the new {@code KTable}. + * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and + * computes a new value for it, resulting in an update record for the result {@code KTable}. + * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. + * This is a stateless record-by-record operation. + * <p> + * The example below counts the number of token of the value string. + * <pre>{@code + * KTable<String, String> inputTable = builder.table("topic"); + * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { + * Integer apply(String value) { + * return value.split(" ").length; + * } + * }); + * }</pre> + * <p> + * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * <p> + * This operation preserves data co-location with respect to the key. + * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + * <p> + * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapper} that computes a new output value + * @param valueSerde serializer for new value type + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @param <VR> the value type of the result {@code KTable} + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, + final Serde<VR> valueSerde, + final StateStoreSupplier<KeyValueStore> storeSupplier); + + + /** * Print the update records of this {@code KTable} to {@code System.out}. * This function will use the generated name of the parent processor node to label the key/value pairs printed to * the console. @@ -156,7 +395,11 @@ public interface KTable<K, V> { * <p> * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable} * update record. + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result. */ + @Deprecated void print(); /** @@ -173,7 +416,11 @@ public interface KTable<K, V> { * update record. * * @param streamName the name used to label the key/value pairs printed to the console + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result. */ + @Deprecated void print(final String streamName); /** @@ -191,8 +438,12 @@ public interface KTable<K, V> { * update record. * * @param keySerde key serde used to deserialize key if type is {@code byte[]}, - * @param valSerde value serde used to deserialize value if type is {@code byte[]}, + * @param valSerde value serde used to deserialize value if type is {@code byte[]} + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result. */ + @Deprecated void print(final Serde<K> keySerde, final Serde<V> valSerde); @@ -212,7 +463,11 @@ public interface KTable<K, V> { * @param keySerde key serde used to deserialize key if type is {@code byte[]}, * @param valSerde value serde used to deserialize value if type is {@code byte[]}, * @param streamName the name used to label the key/value pairs printed to the console + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result. */ + @Deprecated void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName); @@ -232,7 +487,11 @@ public interface KTable<K, V> { * {@code KTable} update record. * * @param filePath name of file to write to + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result. */ + @Deprecated void writeAsText(final String filePath); /** @@ -250,7 +509,11 @@ public interface KTable<K, V> { * * @param filePath name of file to write to * @param streamName the name used to label the key/value pairs printed out to the console + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result. */ + @Deprecated void writeAsText(final String filePath, final String streamName); @@ -270,8 +533,12 @@ public interface KTable<K, V> { * * @param filePath name of file to write to * @param keySerde key serde used to deserialize key if type is {@code byte[]}, - * @param valSerde value serde used to deserialize value if type is {@code byte[]}, + * @param valSerde value serde used to deserialize value if type is {@code byte[]} + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result. */ + @Deprecated void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde); @@ -292,8 +559,13 @@ public interface KTable<K, V> { * @param filePath name of file to write to * @param streamName the name used to label the key/value pairs printed to the console * @param keySerde key serde used to deserialize key if type is {@code byte[]}, - * @param valSerde value serde used to deserialize value if type is {@code byte[]}, + * @param valSerde value serde used to deserialize value if type is {@code byte[]} + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result. + */ + @Deprecated void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, @@ -307,7 +579,11 @@ public interface KTable<K, V> { * {@code KTable} update record. * * @param action an action to perform on each record + * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) } + * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively + * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result. */ + @Deprecated void foreach(final ForeachAction<? super K, ? super V> action); /** @@ -361,12 +637,94 @@ public interface KTable<K, V> { * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name - * @param storeName the state store name used for the result {@code KTable}; valid characters are ASCII - * alphanumerics, '.', '_' and '-' + * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()} + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final String topic, + final String queryableStoreName); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default + * serializers and deserializers and producer's {@link DefaultPartitioner}. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(String) #to(someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. + * {@link KStreamBuilder#table(String, String)}) + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. + * + * @param topic the topic name + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} */ KTable<K, V> through(final String topic, - final String storeName); + final StateStoreSupplier<KeyValueStore> storeSupplier); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default + * serializers and deserializers and producer's {@link DefaultPartitioner}. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(String) #to(someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. + * {@link KStreamBuilder#table(String)}) + * + * @param topic the topic name + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final String topic); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default + * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of + * records to partitions. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. + * {@link KStreamBuilder#table(String)}) + * + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link DefaultPartitioner} will be used + * @param topic the topic name + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, + final String topic); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default + * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of + * records to partitions. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. + * {@link KStreamBuilder#table(String, String)}) + * + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link DefaultPartitioner} will be used + * @param topic the topic name + * @param queryableStoreName the state store name used for the result {@code KTable}. + * If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)} + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, + final String topic, + final String queryableStoreName); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default @@ -384,12 +742,12 @@ public interface KTable<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name - * @param storeName the state store name used for the result {@code KTable} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} */ KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final String storeName); + final StateStoreSupplier<KeyValueStore> storeSupplier); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic. @@ -410,43 +768,156 @@ public interface KTable<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name - * @param storeName the state store name used for the result {@code KTable} + * @param queryableStoreName the state store name used for the result {@code KTable}. + * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} */ KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, final String topic, - final String storeName); + final String queryableStoreName); /** - * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable - * {@link StreamPartitioner} to determine the distribution of records to partitions. + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic. * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * <p> - * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) - * #to(keySerde, valueSerde, partitioner, someTopicName)} and + * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is + * used—otherwise producer's {@link DefaultPartitioner} is used. + * <p> + * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. * {@link KStreamBuilder#table(String, String)}) * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param partitioner the function used to determine how records are distributed among partitions of the topic, - * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, + final String topic, + final StateStoreSupplier<KeyValueStore> storeSupplier); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is + * used—otherwise producer's {@link DefaultPartitioner} is used. + * <p> + * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf. + * {@link KStreamBuilder#table(String)}) + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, + final String topic); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable + * {@link StreamPartitioner} to determine the distribution of records to partitions. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) + * #to(keySerde, valueSerde, partitioner, someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. + * {@link KStreamBuilder#table(String, String)}) + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key + * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will + * be used + * @param topic the topic name + * @param queryableStoreName the state store name used for the result {@code KTable}. + * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()} + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final Serde<K> keySerde, + final Serde<V> valSerde, + final StreamPartitioner<? super K, ? super V> partitioner, + final String topic, + final String queryableStoreName); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable + * {@link StreamPartitioner} to determine the distribution of records to partitions. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) + * #to(keySerde, valueSerde, partitioner, someTopicName)} and + * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. + * {@link KStreamBuilder#table(String, String)}) + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will * be used * @param topic the topic name - * @param storeName the state store name used for the result {@code KTable} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} */ KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final String storeName); + final StateStoreSupplier<KeyValueStore> storeSupplier); + + /** + * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable + * {@link StreamPartitioner} to determine the distribution of records to partitions. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) + * #to(keySerde, valueSerde, partitioner, someTopicName)} and + * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}. + * <p> + * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. + * {@link KStreamBuilder#table(String)}) + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key + * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will + * be used + * @param topic the topic name + * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + */ + KTable<K, V> through(final Serde<K> keySerde, + final Serde<V> valSerde, + final StreamPartitioner<? super K, ? super V> partitioner, + final String topic); + /** * Materialize this changelog stream to a topic using default serializers and deserializers and producer's @@ -647,11 +1118,8 @@ public interface KTable<K, V> { final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); /** - * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using - * non-windowed left equi join. + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. - * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce - * an output record (cf. below). * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result * of the join. * <p> @@ -660,17 +1128,13 @@ public interface KTable<K, V> { * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input * {@code KTable} the result gets updated. * <p> - * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the - * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the - * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = - * null} to compute a value (with arbitrary type) for the result record. + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. * <p> * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. - * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is - * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be - * deleted). + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). * <p> * Input records with {@code null} key will be dropped and no join computation is performed. * <p> @@ -688,7 +1152,7 @@ public interface KTable<K, V> { * <td><K1:A></td> * <td></td> * <td></td> - * <td><K1:ValueJoiner(A,null)></td> + * <td></td> * </tr> * <tr> * <td></td> @@ -698,18 +1162,99 @@ public interface KTable<K, V> { * <td><K1:ValueJoiner(A,b)></td> * </tr> * <tr> + * <td><K1:C></td> + * <td><K1:C></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(C,b)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:C></td> * <td><K1:null></td> * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @param joinSerde serializer for join result value type + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}. + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Serde<VR> joinSerde, + final String queryableStoreName); + + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> * <td></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> * <td><K1:b></td> - * <td><K1:null></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> * </tr> * <tr> + * <td><K1:C></td> + * <td><K1:C></td> * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(C,b)></td> + * </tr> + * <tr> * <td></td> + * <td><K1:C></td> * <td><K1:null></td> * <td></td> - * <td></td> + * <td><K1:null></td> * </tr> * </table> * Both input streams (or to be more precise, their underlying source topics) need to have the same number of @@ -719,21 +1264,23 @@ public interface KTable<K, V> { * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VO> the value type of the other {@code KTable} * @param <VR> the value type of the result {@code KTable} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of - * left {@code KTable} - * @see #join(KTable, ValueJoiner) + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner) * @see #outerJoin(KTable, ValueJoiner) */ - <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, - final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); + <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final StateStoreSupplier<KeyValueStore> storeSupplier); + /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using - * non-windowed outer equi join. + * non-windowed left equi join. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. - * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, - * all records from both input {@code KTable}s will produce an output record (cf. below). + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result * of the join. * <p> @@ -744,14 +1291,15 @@ public interface KTable<K, V> { * <p> * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * Additionally, for each record that does not find a corresponding record in the corresponding other - * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the - * corresponding other value to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. * <p> * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. - * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly - * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). * <p> * Input records with {@code null} key will be dropped and no join computation is performed. * <p> @@ -783,14 +1331,14 @@ public interface KTable<K, V> { * <td></td> * <td></td> * <td><K1:b></td> - * <td><K1:ValueJoiner(null,b)></td> + * <td><K1:null></td> * </tr> * <tr> * <td></td> * <td></td> * <td><K1:null></td> * <td></td> - * <td><K1:null></td> + * <td></td> * </tr> * </table> * Both input streams (or to be more precise, their underlying source topics) need to have the same number of @@ -802,17 +1350,443 @@ public interface KTable<K, V> { * @param <VR> the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of - * both {@code KTable}s + * left {@code KTable} * @see #join(KTable, ValueJoiner) - * @see #leftJoin(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) */ - <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, - final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); + <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:null></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @param joinSerde serializer for join result value type + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#leftJoin(KTable, ValueJoiner)}. + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Serde<VR> joinSerde, + final String queryableStoreName); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:null></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final StateStoreSupplier<KeyValueStore> storeSupplier); + + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(null,b)></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(null,b)></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @param joinSerde serializer for join result value type + * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be + * used to subsequently query the operation results; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried + * (i.e., that would be equivalent to calling {@link KTable#outerJoin(KTable, ValueJoiner)}. + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Serde<VR> joinSerde, + final String queryableStoreName); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(null,b)></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @param storeSupplier user defined state store supplier. Cannot be {@code null}. + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final StateStoreSupplier<KeyValueStore> storeSupplier); /** - * Get the name of the local state store used for materializing this {@code KTable}. + * Get the name of the local state store used that can be used to query this {@code KTable}. * - * @return the underlying state store name, or {@code null} if this {@code KTable} is not materialized + * @return the underlying state store name, or {@code null} if this {@code KTable} cannot be queried. */ - String getStoreName(); + String queryableStoreName(); }
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index dce5d12..8aea44d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -47,6 +47,7 @@ public abstract class AbstractStream<K> { this.sourceNodes = sourceNodes; } + Set<String> ensureJoinableWith(final AbstractStream<K> other) { Set<String> allSourceNodes = new HashSet<>(); allSourceNodes.addAll(sourceNodes); @@ -57,6 +58,12 @@ public abstract class AbstractStream<K> { return allSourceNodes; } + String getOrCreateName(final String queryableStoreName, final String prefix) { + final String returnName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(prefix); + Topic.validate(returnName); + return returnName; + } + static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) { return new ValueJoiner<T2, T1, R>() { @Override