Repository: kafka Updated Branches: refs/heads/trunk 5c547475d -> 74e6dc842
KAFKA-3589: set inner serializer for ChangedSerde upon initialization Author: Guozhang Wang <[email protected]> Reviewers: Eno Thereska <[email protected]>, Ismael Juma <[email protected]> Closes #1246 from guozhangwang/K3589 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74e6dc84 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74e6dc84 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74e6dc84 Branch: refs/heads/trunk Commit: 74e6dc842559d344241c70cb6607a69291e3a20d Parents: 5c54747 Author: Guozhang Wang <[email protected]> Authored: Thu Apr 21 14:51:21 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Apr 21 14:51:21 2016 -0700 ---------------------------------------------------------------------- .../kstream/internals/ChangedDeserializer.java | 10 ++- .../kstream/internals/ChangedSerializer.java | 10 ++- .../kstream/internals/KGroupedTableImpl.java | 76 +++++++++++--------- .../streams/processor/internals/SinkNode.java | 14 ++++ .../streams/processor/internals/SourceNode.java | 18 ++++- .../kstream/internals/KTableAggregateTest.java | 39 ++-------- .../kstream/internals/KTableImplTest.java | 49 +++++++++++++ .../apache/kafka/test/KStreamTestDriver.java | 26 ++++++- .../org/apache/kafka/test/MockAggregator.java | 43 +++++++++++ .../org/apache/kafka/test/MockInitializer.java | 33 +++++++++ .../apache/kafka/test/MockKeyValueMapper.java | 36 ++++++++++ .../java/org/apache/kafka/test/MockReducer.java | 43 +++++++++++ .../apache/kafka/test/NoOpKeyValueMapper.java | 29 -------- 13 files changed, 325 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index d4c4e2d..ce9be49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -26,12 +26,20 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> { private static final int NEWFLAG_SIZE = 1; - private final Deserializer<T> inner; + private Deserializer<T> inner; public ChangedDeserializer(Deserializer<T> inner) { this.inner = inner; } + public Deserializer<T> inner() { + return inner; + } + + public void setInner(Deserializer<T> inner) { + this.inner = inner; + } + @Override public void configure(Map<String, ?> configs, boolean isKey) { // do nothing http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 5dbbac9..12e06f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -27,12 +27,20 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> { private static final int NEWFLAG_SIZE = 1; - private final Serializer<T> inner; + private Serializer<T> inner; public ChangedSerializer(Serializer<T> inner) { this.inner = inner; } + public Serializer<T> inner() { + return inner; + } + + public void setInner(Serializer<T> inner) { + this.inner = inner; + } + @Override public void configure(Map<String, ?> configs, boolean isKey) { // do nothing http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index d9b0f3d..f2e2eed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedTable; @@ -48,15 +50,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup protected final Serde<K> keySerde; protected final Serde<V> valSerde; - private final String sourceName; - public KGroupedTableImpl(KStreamBuilder topology, String name, String sourceName, Serde<K> keySerde, Serde<V> valSerde) { super(topology, name, Collections.singleton(sourceName)); - this.sourceName = sourceName; this.keySerde = keySerde; this.valSerde = valSerde; } @@ -74,8 +73,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup String topic = name + REPARTITION_TOPIC_SUFFIX; - ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer()); - ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer()); + Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); + Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); + Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer(); + Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); + + ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); + ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); @@ -87,10 +91,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name); + topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); // read the intermediate topic - topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); + topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(aggregateName, aggregateSupplier, sourceName); @@ -110,29 +114,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup } @Override - public KTable<K, Long> count(String name) { - return this.aggregate( - new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1L; - } - }, new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate - 1L; - } - }, - Serdes.Long(), name); - } - - @Override public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, String name) { @@ -143,8 +124,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup String topic = name + REPARTITION_TOPIC_SUFFIX; - ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer()); - ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer()); + Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); + Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); + Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer(); + Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); + + ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); + ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); @@ -156,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name); + topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); // read the intermediate topic - topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); + topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(reduceName, aggregateSupplier, sourceName); @@ -169,4 +155,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); } + @Override + public KTable<K, Long> count(String name) { + return this.aggregate( + new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1L; + } + }, new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate - 1L; + } + }, + Serdes.Long(), name); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index e9c2760..3795916 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.ChangedSerializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -52,8 +53,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @Override public void init(ProcessorContext context) { this.context = context; + + // if serializers are null, get the default ones from the context if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer(); if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer(); + + // if value serializers are for {@code Change} values, set the inner serializer when necessary + if (this.valSerializer instanceof ChangedSerializer && + ((ChangedSerializer) this.valSerializer).inner() == null) + ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer()); + } @Override @@ -67,4 +76,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { public void close() { // do nothing } + + // for test only + public Serializer<V> valueSerializer() { + return valSerializer; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 1868c1b..a550344 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.kstream.internals.ChangedDeserializer; import org.apache.kafka.streams.processor.ProcessorContext; public class SourceNode<K, V> extends ProcessorNode<K, V> { @@ -46,9 +47,16 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { public void init(ProcessorContext context) { this.context = context; - // if serializers are null, get the default ones from the context - if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer(); - if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer(); + // if deserializers are null, get the default ones from the context + if (this.keyDeserializer == null) + this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer(); + if (this.valDeserializer == null) + this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer(); + + // if value deserializers are for {@code Change} values, set the inner deserializer when necessary + if (this.valDeserializer instanceof ChangedDeserializer && + ((ChangedDeserializer) this.valDeserializer).inner() == null) + ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer()); } @Override @@ -61,4 +69,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { // do nothing } + // for test only + public Deserializer<V> valueDeserializer() { + return valDeserializer; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 1564e95..be0ec19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -20,13 +20,13 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.NoOpKeyValueMapper; import org.junit.Test; import java.io.File; @@ -38,31 +38,6 @@ public class KTableAggregateTest { final private Serde<String> stringSerde = new Serdes.StringSerde(); - private class StringAdd implements Aggregator<String, String, String> { - - @Override - public String apply(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; - } - } - - private class StringRemove implements Aggregator<String, String, String> { - - @Override - public String apply(String aggKey, String value, String aggregate) { - return aggregate + "-" + value; - } - } - - private class StringInit implements Initializer<String> { - - @Override - public String apply() { - return "0"; - } - } - - @Test public void testAggBasic() throws Exception { final File baseDir = Files.createTempDirectory("test").toFile(); @@ -72,12 +47,12 @@ public class KTableAggregateTest { String topic1 = "topic1"; KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); - KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(), + KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), stringSerde, stringSerde - ).aggregate(new StringInit(), - new StringAdd(), - new StringRemove(), + ).aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + MockAggregator.STRING_REMOVER, stringSerde, "topic1-Canonized"); http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 6f49b6a..8a13e9a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -25,8 +25,14 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.internals.SinkNode; +import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; import org.junit.Test; import java.io.File; @@ -34,7 +40,9 @@ import java.io.IOException; import java.nio.file.Files; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class KTableImplTest { @@ -295,6 +303,47 @@ public class KTableImplTest { } finally { Utils.delete(stateDir); } + } + + @Test + public void testRepartition() throws IOException { + String topic1 = "topic1"; + + File stateDir = Files.createTempDirectory("test").toFile(); + try { + KStreamBuilder builder = new KStreamBuilder(); + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1 + .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1"); + + + KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1 + .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2"); + + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde); + driver.setTime(0L); + + // three state store should be created, one for source, one for aggregate and one for reduce + assertEquals(3, driver.allStateStores().size()); + + // contains the corresponding repartition source / sink nodes + assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008")); + + assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner()); + assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner()); + assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner()); + assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner()); + + } finally { + Utils.delete(stateDir); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 2ee8730..d738794 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -31,8 +31,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; import java.io.File; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class KStreamTestDriver { @@ -151,11 +153,33 @@ public class KStreamTestDriver { } } + public Set<String> allProcessorNames() { + Set<String> names = new HashSet<>(); + + List<ProcessorNode> nodes = topology.processors(); + + for (ProcessorNode node: nodes) { + names.add(node.name()); + } + + return names; + } + + public ProcessorNode processor(String name) { + List<ProcessorNode> nodes = topology.processors(); + + for (ProcessorNode node: nodes) { + if (node.name().equals(name)) + return node; + } + + return null; + } + public Map<String, StateStore> allStateStores() { return context.allStateStores(); } - private class MockRecordCollector extends RecordCollector { public MockRecordCollector() { super(null); http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockAggregator.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java new file mode 100644 index 0000000..e8bb10b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.Aggregator; + +public class MockAggregator { + + private static class StringAdd implements Aggregator<String, String, String> { + + @Override + public String apply(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; + } + } + + private static class StringRemove implements Aggregator<String, String, String> { + + @Override + public String apply(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; + } + } + + public final static Aggregator<String, String, String> STRING_ADDER = new StringAdd(); + + public final static Aggregator<String, String, String> STRING_REMOVER = new StringRemove(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockInitializer.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java new file mode 100644 index 0000000..9bfe7f8 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.Initializer; + +public class MockInitializer { + + private static class StringInit implements Initializer<String> { + + @Override + public String apply() { + return "0"; + } + } + + public final static Initializer<String> STRING_INIT = new StringInit(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java new file mode 100644 index 0000000..ae8c2fd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; + +public class MockKeyValueMapper { + + private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> { + + @Override + public KeyValue<K, V> apply(K key, V value) { + return new KeyValue<>(key, value); + } + } + + public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper() { + return new NoOpKeyValueMapper<>(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockReducer.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockReducer.java b/streams/src/test/java/org/apache/kafka/test/MockReducer.java new file mode 100644 index 0000000..24a8fea --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockReducer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.Reducer; + +public class MockReducer { + + private static class StringAdd implements Reducer<String> { + + @Override + public String apply(String value1, String value2) { + return value1 + "+" + value2; + } + } + + private static class StringRemove implements Reducer<String> { + + @Override + public String apply(String value1, String value2) { + return value1 + "-" + value2; + } + } + + public final static Reducer<String> STRING_ADDER = new StringAdd(); + + public final static Reducer<String> STRING_REMOVER = new StringRemove(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java deleted file mode 100644 index 828b5ae..0000000 --- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.test; - -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; - -public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> { - - @Override - public KeyValue<K, V> apply(K key, V value) { - return new KeyValue<>(key, value); - } -}
