Repository: kafka Updated Branches: refs/heads/trunk 3a58407e2 -> 9beafae23
KAFKA-3512: Added foreach operator miguno guozhangwang please have a look if you can. Author: Eno Thereska <[email protected]> Reviewers: Michael G. Noll <[email protected]>, Guozhang Wang <[email protected]> Closes #1193 from enothereska/kafka-3512-ForEach Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9beafae2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9beafae2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9beafae2 Branch: refs/heads/trunk Commit: 9beafae23a83774fc1d9ea811d449eac34240363 Parents: 3a58407 Author: Eno Thereska <[email protected]> Authored: Fri Apr 8 09:17:05 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Apr 8 09:17:05 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/ForeachAction.java | 35 ++++++++ .../apache/kafka/streams/kstream/KStream.java | 8 ++ .../apache/kafka/streams/kstream/KTable.java | 8 ++ .../kstream/internals/KStreamForeach.java | 44 ++++++++++ .../streams/kstream/internals/KStreamImpl.java | 8 ++ .../streams/kstream/internals/KTableImpl.java | 15 ++++ .../kstream/internals/KStreamForeachTest.java | 85 ++++++++++++++++++++ .../kstream/internals/KTableForeachTest.java | 85 ++++++++++++++++++++ 8 files changed, 288 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java new file mode 100644 index 0000000..83064e8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + + + +/** + * The ForeachAction interface for performing an action on a key-value pair. + * Note that this action is stateless. If stateful processing is required, consider + * using {@link KStream#transform(TransformerSupplier, String...)} or + * {@link KStream#process(ProcessorSupplier, String...)} instead. + * + * @param <K> original key type + * @param <V> original value type + */ +public interface ForeachAction<K, V> { + void apply(K key, V value); +} + + http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index e4933cb..a55e726 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -101,6 +101,14 @@ public interface KStream<K, V> { KStream<K, V> through(String topic); /** + * Perform an action on each element of {@link KStream}. + * Note that this is a terminal operation that returns void. + * + * @param action An action to perform on each element + */ + void foreach(ForeachAction<K, V> action); + + /** * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}. http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 581ee28..1f6ee68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -297,4 +297,12 @@ public interface KTable<K, V> { * @param <K1> the key type of the aggregated {@link KTable} */ <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name); + + /** + * Perform an action on each element of {@link KTable}. + * Note that this is a terminal operation that returns void. + * + * @param action An action to perform on each element + */ + void foreach(ForeachAction<K, V> action); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java new file mode 100644 index 0000000..2fd7ef9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +class KStreamForeach<K, V> implements ProcessorSupplier<K, V> { + + private final ForeachAction<K, V> action; + + public KStreamForeach(ForeachAction<K, V> action) { + this.action = action; + } + + @Override + public Processor<K, V> get() { + return new KStreamForeachProcessor(); + } + + private class KStreamForeachProcessor extends AbstractProcessor<K, V> { + @Override + public void process(K key, V value) { + action.apply(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0fb3984..c266328 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -92,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; + private static final String FOREACH_NAME = "KSTREAM-FOREACH-"; + public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) { super(topology, name, sourceNodes); } @@ -201,6 +204,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public void foreach(ForeachAction<K, V> action) { + String name = topology.newName(FOREACH_NAME); + + topology.addProcessor(name, new KStreamForeach(action), this.name); + } public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { return through(keySerde, valSerde, null, topic); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 156f2db..8de9a0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -76,6 +77,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; + private static final String FOREACH_NAME = "KTABLE-FOREACH-"; + public final ProcessorSupplier<?, ?> processorSupplier; private final Serde<K> keySerde; @@ -142,6 +145,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override + public void foreach(final ForeachAction<K, V> action) { + String name = topology.newName(FOREACH_NAME); + KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>() { + @Override + public void apply(K key, Change<V> value) { + action.apply(key, value.newValue); + } + }); + topology.addProcessor(name, processorSupplier, this.name); + } + + @Override public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { return through(keySerde, valSerde, null, topic); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java new file mode 100644 index 0000000..6573779 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.test.KStreamTestDriver; +import org.junit.Test; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class KStreamForeachTest { + + final private String topicName = "topic"; + + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); + + @Test + public void testForeach() { + // Given + List<KeyValue<Integer, String>> inputRecords = Arrays.asList( + new KeyValue<>(0, "zero"), + new KeyValue<>(1, "one"), + new KeyValue<>(2, "two"), + new KeyValue<>(3, "three") + ); + + List<KeyValue<Integer, String>> expectedRecords = Arrays.asList( + new KeyValue<>(0, "ZERO"), + new KeyValue<>(2, "ONE"), + new KeyValue<>(4, "TWO"), + new KeyValue<>(6, "THREE") + ); + + final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>(); + ForeachAction<Integer, String> action = + new ForeachAction<Integer, String>() { + @Override + public void apply(Integer key, String value) { + actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase())); + } + }; + + // When + KStreamBuilder builder = new KStreamBuilder(); + KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName); + stream.foreach(action); + + // Then + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (KeyValue<Integer, String> record: inputRecords) { + driver.process(topicName, record.key, record.value); + } + + assertEquals(expectedRecords.size(), actualRecords.size()); + for (int i = 0; i < expectedRecords.size(); i++) { + KeyValue<Integer, String> expectedRecord = expectedRecords.get(i); + KeyValue<Integer, String> actualRecord = actualRecords.get(i); + assertEquals(expectedRecord, actualRecord); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java new file mode 100644 index 0000000..4b612a5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.KStreamTestDriver; +import org.junit.Test; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class KTableForeachTest { + + final private String topicName = "topic"; + + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); + + @Test + public void testForeach() { + // Given + List<KeyValue<Integer, String>> inputRecords = Arrays.asList( + new KeyValue<>(0, "zero"), + new KeyValue<>(1, "one"), + new KeyValue<>(2, "two"), + new KeyValue<>(3, "three") + ); + + List<KeyValue<Integer, String>> expectedRecords = Arrays.asList( + new KeyValue<>(0, "ZERO"), + new KeyValue<>(2, "ONE"), + new KeyValue<>(4, "TWO"), + new KeyValue<>(6, "THREE") + ); + + final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>(); + ForeachAction<Integer, String> action = + new ForeachAction<Integer, String>() { + @Override + public void apply(Integer key, String value) { + actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase())); + } + }; + + // When + KStreamBuilder builder = new KStreamBuilder(); + KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName); + table.foreach(action); + + // Then + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (KeyValue<Integer, String> record: inputRecords) { + driver.process(topicName, record.key, record.value); + } + + assertEquals(expectedRecords.size(), actualRecords.size()); + for (int i = 0; i < expectedRecords.size(); i++) { + KeyValue<Integer, String> expectedRecord = expectedRecords.get(i); + KeyValue<Integer, String> actualRecord = actualRecords.get(i); + assertEquals(expectedRecord, actualRecord); + } + } +}
