Repository: kafka Updated Branches: refs/heads/trunk 96209b1e7 -> c67ca6588
MINOR: putting back kstream stateful transform methods guozhangwang * added back type safe stateful transform methods (kstream.transform() and kstream.transformValues()) * changed kstream.process() to void Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #292 from ymatsuda/transform_method Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c67ca658 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c67ca658 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c67ca658 Branch: refs/heads/trunk Commit: c67ca65889721e3af7527ab26df49a9fb4db87ef Parents: 96209b1 Author: Yasuhiro Matsuda <[email protected]> Authored: Fri Oct 9 16:28:40 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Oct 9 16:28:40 2015 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 21 ++++- .../kafka/streams/kstream/Transformer.java | 57 ++++++++++++ .../streams/kstream/TransformerSupplier.java | 24 +++++ .../kafka/streams/kstream/ValueTransformer.java | 56 ++++++++++++ .../kstream/ValueTransformerSupplier.java | 24 +++++ .../streams/kstream/internals/KStreamImpl.java | 30 ++++++- .../kstream/internals/KStreamTransform.java | 71 +++++++++++++++ .../internals/KStreamTransformValues.java | 69 +++++++++++++++ .../processor/internals/StreamThread.java | 10 ++- .../kstream/internals/KStreamImplTest.java | 5 +- .../kstream/internals/KStreamTransformTest.java | 93 ++++++++++++++++++++ .../internals/KStreamTransformValuesTest.java | 92 +++++++++++++++++++ 12 files changed, 539 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index ecec882..915cf1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; /** * KStream is an abstraction of a stream of key-value pairs. - * + * * @param <K> the type of keys * @param <V> the type of values */ @@ -151,10 +151,27 @@ public interface KStream<K, V> { void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer); /** + * Applies a stateful transformation to all elements in this stream. + * + * @param transformerSupplier the class of TransformerDef + * @return KStream + */ + <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier); + + /** + * Applies a stateful transformation to all values in this stream. + * + * @param valueTransformerSupplier the class of TransformerDef + * @return KStream + */ + <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier); + + /** * Processes all elements in this stream by applying a processor. * * @param processorSupplier the supplier of the Processor to use * @return the new stream containing the processed output */ - <K1, V1> KStream<K1, V1> process(ProcessorSupplier<K, V> processorSupplier); + void process(ProcessorSupplier<K, V> processorSupplier); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java new file mode 100644 index 0000000..b67f619 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface Transformer<K, V, R> { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + * <p> + * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param key the key for the message + * @param value the value for the message + * @return new value + */ + R transform(K key, V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java new file mode 100644 index 0000000..2c2d8dd --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface TransformerSupplier<K, V, R> { + + Transformer<K, V, R> get(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java new file mode 100644 index 0000000..5b9e2ff --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface ValueTransformer<V, R> { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + * <p> + * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param value the value for the message + * @return new value + */ + R transform(V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java new file mode 100644 index 0000000..5c053c7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface ValueTransformerSupplier<V, R> { + + ValueTransformer<V, R> get(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index cff97d6..8f56e09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; @@ -44,6 +46,10 @@ public class KStreamImpl<K, V> implements KStream<K, V> { private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-"; + private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-"; + + private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-"; + private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-"; private static final String BRANCH_NAME = "KAFKA-BRANCH-"; @@ -191,11 +197,27 @@ public class KStreamImpl<K, V> implements KStream<K, V> { } @Override - public <K1, V1> KStream<K1, V1> process(final ProcessorSupplier<K, V> processorSupplier) { - String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) { + String name = TRANSFORM_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, processorSupplier, this.name); + topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name); + + return new KStreamImpl<>(topology, name); + } + + @Override + public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) { + String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); return new KStreamImpl<>(topology, name); } + + @Override + public void process(final ProcessorSupplier<K, V> processorSupplier) { + String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, processorSupplier, this.name); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java new file mode 100644 index 0000000..7ebab0e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> { + + private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier; + + public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) { + this.transformerSupplier = transformerSupplier; + } + + @Override + public Processor<K1, V1> get() { + return new KStreamTransformProcessor(transformerSupplier.get()); + } + + public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> { + + private final Transformer<K1, V1, KeyValue<K2, V2>> transformer; + private ProcessorContext context; + + public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) { + this.transformer = transformer; + } + + @Override + public void init(ProcessorContext context) { + transformer.init(context); + this.context = context; + } + + @Override + public void process(K1 key, V1 value) { + KeyValue<K2, V2> pair = transformer.transform(key, value); + context.forward(pair.key, pair.value); + } + + @Override + public void punctuate(long timestamp) { + transformer.punctuate(timestamp); + } + + @Override + public void close() { + transformer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java new file mode 100644 index 0000000..6f989e6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> { + + private final ValueTransformerSupplier<V, R> valueTransformerSupplier; + + public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) { + this.valueTransformerSupplier = valueTransformerSupplier; + } + + @Override + public Processor<K, V> get() { + return new KStreamTransformValuesProcessor(valueTransformerSupplier.get()); + } + + public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> { + + private final ValueTransformer valueTransformer; + private ProcessorContext context; + + public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) { + this.valueTransformer = valueTransformer; + } + + @Override + public void init(ProcessorContext context) { + valueTransformer.init(context); + this.context = context; + } + + @Override + public void process(K key, V value) { + context.forward(key, valueTransformer.transform(value)); + } + + @Override + public void punctuate(long timestamp) { + valueTransformer.punctuate(timestamp); + } + + @Override + public void close() { + valueTransformer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 95a923d..4a68332 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -159,7 +159,7 @@ public class StreamThread extends Thread { new ByteArrayDeserializer(), new ByteArrayDeserializer()); } - + private Consumer<byte[], byte[]> createRestoreConsumer() { log.info("Creating restore consumer client for stream thread [" + this.getName() + "]"); return new KafkaConsumer<>(config.getConsumerConfigs(), @@ -240,9 +240,11 @@ public class StreamThread extends Thread { // try to fetch some records if necessary ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); - for (StreamTask task : tasks.values()) { - for (TopicPartition partition : task.partitions()) { - task.addRecords(partition, records.records(partition)); + if (!records.isEmpty()) { + for (StreamTask task : tasks.values()) { + for (TopicPartition partition : task.partitions()) { + task.addRecords(partition, records.records(partition)); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 875712a..2db488c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -119,7 +119,7 @@ public class KStreamImplTest { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7"); + stream5.through("topic-6").process(new MockProcessorSupplier<>()); assertEquals(2 + // sources 2 + // stream1 @@ -131,8 +131,7 @@ public class KStreamImplTest { 2 + 3 + // stream5 1 + // to 2 + // through - 1 + // process - 1, // to + 1, // process builder.build().processors().size()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java new file mode 100644 index 0000000..e397dd1 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>> transformerSupplier = + new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() { + public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() { + return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public KeyValue<Integer, Integer> transform(Integer key, Integer value) { + total += value; + return KeyValue.pair(key * 2, total); + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream<Integer, Integer> stream; + MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transform(transformerSupplier).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java new file mode 100644 index 0000000..c5c9b39 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformValuesTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + ValueTransformerSupplier<Integer, Integer> valueTransformerSupplier = + new ValueTransformerSupplier<Integer, Integer>() { + public ValueTransformer<Integer, Integer> get() { + return new ValueTransformer<Integer, Integer>() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public Integer transform(Integer value) { + total += value; + return total; + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream<Integer, Integer> stream; + MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transformValues(valueTransformerSupplier).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +}
