Repository: kafka Updated Branches: refs/heads/trunk 18dd1986b -> 088ab3eaa
KAFKA-3614: Consolidate duplicate code in KGroupedTableImpl Feel free to review guozhangwang enothereska mjsax . Author: Michael G. Noll <[email protected]> Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska Closes #1262 from miguno/KAFKA-3614 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/088ab3ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/088ab3ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/088ab3ea Branch: refs/heads/trunk Commit: 088ab3eaadb2389b52aedc049b6f1f0d4b5fb989 Parents: 18dd198 Author: Michael G. Noll <[email protected]> Authored: Tue Apr 26 09:59:37 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 26 09:59:37 2016 -0700 ---------------------------------------------------------------------- .../kstream/internals/KGroupedTableImpl.java | 73 ++++++-------------- 1 file changed, 23 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/088ab3ea/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index f2e2eed..f7fe4e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -67,41 +67,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup Serde<T> aggValueSerde, String name) { - String sinkName = topology.newName(KStreamImpl.SINK_NAME); - String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String aggregateName = topology.newName(AGGREGATE_NAME); - - String topic = name + REPARTITION_TOPIC_SUFFIX; - - Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); - Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); - Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer(); - Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); - - ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); - ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); - - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent() - .build(); - - // send the aggregate key-value pairs to the intermediate topic for partitioning - topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); - - // read the intermediate topic - topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); - - // aggregate the values with the aggregator and local store - topology.addProcessor(aggregateName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, aggregateName); - - // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); + return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name); } @Override @@ -113,14 +80,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return aggregate(initializer, adder, substractor, null, name); } - @Override - public KTable<K, V> reduce(Reducer<V> adder, - Reducer<V> subtractor, - String name) { - + private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, + Serde<T> aggValueSerde, + String functionName, + String name) { String sinkName = topology.newName(KStreamImpl.SINK_NAME); String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String reduceName = topology.newName(REDUCE_NAME); + String funcName = topology.newName(functionName); String topic = name + REPARTITION_TOPIC_SUFFIX; @@ -132,13 +98,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(valSerde) - .persistent() - .build(); + .withKeys(keySerde) + .withValues(aggValueSerde) + .persistent() + .build(); // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); @@ -148,11 +112,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store - topology.addProcessor(reduceName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, reduceName); + topology.addProcessor(funcName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, funcName); // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); + return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName)); + } + + @Override + public KTable<K, V> reduce(Reducer<V> adder, + Reducer<V> subtractor, + String name) { + ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); + return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, name); } @Override @@ -177,4 +149,5 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup }, Serdes.Long(), name); } -} + +} \ No newline at end of file
