Repository: kafka
Updated Branches:
  refs/heads/0.11.0 9a21bf20b -> a8dbce47f


KAFKA-4144 Follow-up: add one missing overload function to maintain backward 
compatibility

A follow up RP to fix 
[issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864)

Author: Jeyhun Karimov <[email protected]>

Reviewers: Matthias J. Sax, Eno Thereska, Bill Bejeck, Guozhang Wang

Closes #3109 from jeyhunkarimov/KAFKA-4144-follow-up

(cherry picked from commit c5d44af77473abb36cb9bf7ca3dead36490b8320)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8dbce47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8dbce47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8dbce47

Branch: refs/heads/0.11.0
Commit: a8dbce47f5a39ebc0fb21b7e0ef43f837b252313
Parents: 9a21bf2
Author: Jeyhun Karimov <[email protected]>
Authored: Wed May 24 19:00:37 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed May 24 19:00:45 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 39 +++++++++++++++++++-
 1 file changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a8dbce47/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index fb05e4d..59b8c6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1088,7 +1088,44 @@ public class KStreamBuilder extends TopologyBuilder {
                                                  final 
StateStoreSupplier<KeyValueStore> storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
-
+    
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local 
{@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original 
input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return 
a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) 
KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.<String, 
Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code 
"auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde 
defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde 
defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is 
the equivalent of
+     *                           {@link KStreamBuilder#globalTable(Serde, 
Serde, String)} ()}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic,
+                                                 final String 
queryableStoreName) {
+        return globalTable(keySerde, valSerde, null, topic, 
queryableStoreName);
+    }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.

Reply via email to