Repository: kafka Updated Branches: refs/heads/trunk 5236bf60d -> b1691cf49
KAFKA-3430: Allow users to set key in KTable.toStream and in KStream. ⦠With KStream the method selectKey was added to enable getting a key from values before perfoming aggregation-by-key operations on original streams that have null keys. Author: bbejeck <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1222 from bbejeck/KAFKA-3430_allow_users_to_set_key_KTable_toStream Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1691cf4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1691cf4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1691cf4 Branch: refs/heads/trunk Commit: b1691cf49e9de850ac8a2675c487af9fb60bfdaa Parents: 5236bf6 Author: Bill Bejeck <[email protected]> Authored: Sat Apr 16 20:18:27 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sat Apr 16 20:18:27 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 9 ++ .../apache/kafka/streams/kstream/KTable.java | 9 ++ .../streams/kstream/internals/KStreamImpl.java | 15 ++++ .../streams/kstream/internals/KTableImpl.java | 5 ++ .../kstream/internals/KStreamSelectKeyTest.java | 83 ++++++++++++++++++ .../kstream/internals/KTableMapKeysTest.java | 88 ++++++++++++++++++++ 6 files changed, 209 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 27475aa..7e3562c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -46,6 +46,15 @@ public interface KStream<K, V> { */ KStream<K, V> filterNot(Predicate<K, V> predicate); + + /** + * Create a new key from the current key and value. + * + * @param mapper the instance of {@link KeyValueMapper} + * @param <K1> the new key type on the stream + */ + <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper); + /** * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream. * http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index bb6878f..1e44cb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -203,6 +203,15 @@ public interface KTable<K, V> { KStream<K, V> toStream(); /** + * Convert this stream to a new instance of {@link KStream} using the given {@link KeyValueMapper} to select + * the new key. + * + * @param mapper @param mapper the instance of {@link KeyValueMapper} + * @param <K1> the new key type + */ + <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper); + + /** * Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join. * * @param other the instance of {@link KTable} joined with this stream http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7030021..a84b4aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -86,6 +86,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V private static final String REDUCE_NAME = "KSTREAM-REDUCE-"; + private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-"; + public static final String SINK_NAME = "KSTREAM-SINK-"; public static final String SOURCE_NAME = "KSTREAM-SOURCE-"; @@ -121,6 +123,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + @SuppressWarnings("unchecked") + public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) { + String name = topology.newName(KEY_SELECT_NAME); + topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() { + @Override + public KeyValue<K1, V> apply(K key, V value) { + return new KeyValue(mapper.apply(key, value), value); + } + }), this.name); + return new KStreamImpl<>(topology, name, sourceNodes); + } + + @Override public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) { String name = topology.newName(MAP_NAME); http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index f78169e..5c291f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -244,6 +244,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return new KStreamImpl<>(topology, name, sourceNodes); } + @Override + public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) { + return toStream().selectKey(mapper); + } + @SuppressWarnings("unchecked") @Override public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java new file mode 100644 index 0000000..5f19b9e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.streams.kstream.internals; + + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class KStreamSelectKeyTest { + + private String topicName = "topic_key_select"; + + final private Serde<Integer> integerSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); + + @Test + public void testSelectKey() { + KStreamBuilder builder = new KStreamBuilder(); + + final Map<Integer, String> keyMap = new HashMap<>(); + keyMap.put(1, "ONE"); + keyMap.put(2, "TWO"); + keyMap.put(3, "THREE"); + + + KeyValueMapper<String, Integer, String> selector = new KeyValueMapper<String, Integer, String>() { + @Override + public String apply(String key, Integer value) { + return keyMap.get(value); + } + }; + + final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"}; + final int[] expectedValues = new int[]{1, 2, 3}; + + KStream<String, Integer> stream = builder.stream(stringSerde, integerSerde, topicName); + + MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>(); + + stream.selectKey(selector).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + + for (int expectedValue : expectedValues) { + driver.process(topicName, null, expectedValue); + } + + assertEquals(3, processor.processed.size()); + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java new file mode 100644 index 0000000..ce1b9d6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class KTableMapKeysTest { + + final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<Integer> integerSerde = new Serdes.IntegerSerde(); + + @Test + public void testMapKeysConvertingToStream() { + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic_map_keys"; + + KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1); + + final Map<Integer, String> keyMap = new HashMap<>(); + keyMap.put(1, "ONE"); + keyMap.put(2, "TWO"); + keyMap.put(3, "THREE"); + + KeyValueMapper<Integer, String, String> keyMapper = new KeyValueMapper<Integer, String, String>() { + @Override + public String apply(Integer key, String value) { + return keyMap.get(key); + } + }; + + KStream<String, String> convertedStream = table1.toStream(keyMapper); + + final String[] expected = new String[]{"ONE:V_ONE", "TWO:V_TWO", "THREE:V_THREE"}; + final int[] originalKeys = new int[]{1, 2, 3}; + final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"}; + + + + MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>(); + + convertedStream.process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + + for (int i = 0; i < originalKeys.length; i++) { + driver.process(topic1, originalKeys[i], values[i]); + } + + assertEquals(3, processor.processed.size()); + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + + + +} \ No newline at end of file
