Repository: kafka Updated Branches: refs/heads/0.11.0 9a21bf20b -> a8dbce47f
KAFKA-4144 Follow-up: add one missing overload function to maintain backward compatibility A follow up RP to fix [issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864) Author: Jeyhun Karimov <[email protected]> Reviewers: Matthias J. Sax, Eno Thereska, Bill Bejeck, Guozhang Wang Closes #3109 from jeyhunkarimov/KAFKA-4144-follow-up (cherry picked from commit c5d44af77473abb36cb9bf7ca3dead36490b8320) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8dbce47 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8dbce47 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8dbce47 Branch: refs/heads/0.11.0 Commit: a8dbce47f5a39ebc0fb21b7e0ef43f837b252313 Parents: 9a21bf2 Author: Jeyhun Karimov <[email protected]> Authored: Wed May 24 19:00:37 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 24 19:00:45 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/KStreamBuilder.java | 39 +++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a8dbce47/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index fb05e4d..59b8c6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -1088,7 +1088,44 @@ public class KStreamBuilder extends TopologyBuilder { final StateStoreSupplier<KeyValueStore> storeSupplier) { return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier); } - + + /** + * Create a {@link GlobalKTable} for the specified topic. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code queryableStoreName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); + * }</pre> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param queryableStoreName the state store name; If {@code null} this is the equivalent of + * {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()} + * @return a {@link GlobalKTable} for the specified topic + */ + @SuppressWarnings("unchecked") + public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String queryableStoreName) { + return globalTable(keySerde, valSerde, null, topic, queryableStoreName); + } /** * Create a {@link GlobalKTable} for the specified topic.
