Repository: kafka Updated Branches: refs/heads/trunk 9d71489ff -> 5c547475d
KAFKA-3337: Extract selector as a separate groupBy operator for KTable aggregations Author: Matthias J. Sax <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1231 from mjsax/kafka-3337-extact-key-selector-from-agg Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c547475 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c547475 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c547475 Branch: refs/heads/trunk Commit: 5c547475d86aa336f8b3c4bb69faff39759d5df5 Parents: 9d71489 Author: Matthias J. Sax <[email protected]> Authored: Thu Apr 21 13:42:17 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Apr 21 13:42:17 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/KGroupedTable.java | 82 +++++++++ .../apache/kafka/streams/kstream/KTable.java | 109 ++---------- .../kstream/internals/KGroupedTableImpl.java | 172 +++++++++++++++++++ .../streams/kstream/internals/KTableImpl.java | 162 ++--------------- .../kstream/internals/KTableAggregateTest.java | 14 +- .../streams/smoketest/SmokeTestClient.java | 17 +- 6 files changed, 291 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java new file mode 100644 index 0000000..86c34b1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.serialization.Serde; + +/** + * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table. + * + * @param <K> Type of primary keys + * @param <V> Type of value changes + */ [email protected] +public interface KGroupedTable<K, V> { + + /** + * Combine updating values of this stream by the selected key into a new instance of {@link KTable}. + * + * @param adder the instance of {@link Reducer} for addition + * @param subtractor the instance of {@link Reducer} for subtraction + * @param name the name of the resulted {@link KTable} + */ + KTable<K, V> reduce(Reducer<V> adder, + Reducer<V> subtractor, + String name); + + /** + * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}. + * + * @param initializer the instance of {@link Initializer} + * @param adder the instance of {@link Aggregator} for addition + * @param substractor the instance of {@link Aggregator} for subtraction + * @param aggValueSerde value serdes for materializing the aggregated table, + * if not specified the default serdes defined in the configs will be used + * @param name the name of the resulted table + * @param <T> the value type of the aggregated {@link KTable} + */ + <T> KTable<K, T> aggregate(Initializer<T> initializer, + Aggregator<K, V, T> adder, + Aggregator<K, V, T> substractor, + Serde<T> aggValueSerde, + String name); + + /** + * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable} + * using default serializers and deserializers. + * + * @param initializer the instance of {@link Initializer} + * @param adder the instance of {@link Aggregator} for addition + * @param substractor the instance of {@link Aggregator} for subtraction + * @param name the name of the resulted {@link KTable} + * @param <T> the value type of the aggregated {@link KTable} + */ + <T> KTable<K, T> aggregate(Initializer<T> initializer, + Aggregator<K, V, T> adder, + Aggregator<K, V, T> substractor, + String name); + + /** + * Count number of records of this stream by the selected key into a new instance of {@link KTable}. + * + * @param name the name of the resulted {@link KTable} + */ + KTable<K, Long> count(String name); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 1e44cb5..8414279 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -242,113 +242,26 @@ public interface KTable<K, V> { <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); /** - * Combine updating values of this stream by the selected key into a new instance of {@link KTable}. - * - * @param adder the instance of {@link Reducer} for addition - * @param subtractor the instance of {@link Reducer} for subtraction - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key - * @param keySerde key serdes for materializing the aggregated table, - * if not specified the default serdes defined in the configs will be used - * @param valueSerde value serdes for materializing the aggregated table, - * if not specified the default serdes defined in the configs will be used - * @param name the name of the resulted {@link KTable} - * @param <K1> the key type of the aggregated {@link KTable} - * @param <V1> the value type of the aggregated {@link KTable} - */ - <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, - Reducer<V1> subtractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serde<K1> keySerde, - Serde<V1> valueSerde, - String name); - - /** - * Combine updating values of this stream by the selected key into a new instance of {@link KTable} - * using default serializers and deserializers. - * - * @param adder the instance of {@link Reducer} for addition - * @param subtractor the instance of {@link Reducer} for subtraction - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key - * @param name the name of the resulted {@link KTable} - * @param <K1> the key type of the aggregated {@link KTable} - * @param <V1> the value type of the aggregated {@link KTable} - */ - <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, - Reducer<V1> subtractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - String name); - - /** - * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}. - * - * @param initializer the instance of {@link Initializer} - * @param adder the instance of {@link Aggregator} for addition - * @param substractor the instance of {@link Aggregator} for subtraction - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key - * @param keySerde key serdes for materializing this stream and the aggregated table, - * if not specified the default serdes defined in the configs will be used - * @param valueSerde value serdes for materializing this stream, - * if not specified the default serdes defined in the configs will be used - * @param aggValueSerde value serdes for materializing the aggregated table, - * if not specified the default serdes defined in the configs will be used - * @param name the name of the resulted table - * @param <K1> the key type of this {@link KTable} - * @param <V1> the value type of this {@link KTable} - * @param <T> the value type of the aggregated {@link KTable} - */ - <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, - Aggregator<K1, V1, T> adder, - Aggregator<K1, V1, T> substractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serde<K1> keySerde, - Serde<V1> valueSerde, - Serde<T> aggValueSerde, - String name); - - /** - * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable} - * using default serializers and deserializers. - * - * @param initializer the instance of {@link Initializer} - * @param adder the instance of {@link Aggregator} for addition - * @param substractor the instance of {@link Aggregator} for subtraction - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key - * @param name the name of the resulted {@link KTable} - * @param <K1> the key type of the aggregated {@link KTable} - * @param <V1> the value type of the aggregated {@link KTable} - * @param <T> the value type of the aggregated {@link KTable} - */ - <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, - Aggregator<K1, V1, T> adder, - Aggregator<K1, V1, T> substractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - String name); - - /** - * Count number of records of this stream by the selected key into a new instance of {@link KTable}. - * - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key + * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}. + * + * @param selector select the grouping key and value to be aggregated * @param keySerde key serdes for materializing this stream, * if not specified the default serdes defined in the configs will be used * @param valueSerde value serdes for materializing this stream, * if not specified the default serdes defined in the configs will be used - * @param name the name of the resulted table - * @param <K1> the key type of the aggregated {@link KTable} + * @param <K1> the key type of the {@link KGroupedTable} + * @param <V1> the value type of the {@link KGroupedTable} */ - <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, - Serde<K1> keySerde, - Serde<V> valueSerde, - String name); + <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde); /** - * Count number of records of this stream by the selected key into a new instance of {@link KTable} - * using default serializers and deserializers. + * Group the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers and deserializers. * - * @param selector the instance of {@link KeyValueMapper} that select the aggregate key - * @param name the name of the resulted {@link KTable} - * @param <K1> the key type of the aggregated {@link KTable} + * @param selector select the grouping key and value to be aggregated + * @param <K1> the key type of the {@link KGroupedTable} + * @param <V1> the value type of the {@link KGroupedTable} */ - <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name); + <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector); /** * Perform an action on each element of {@link KTable}. http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java new file mode 100644 index 0000000..d9b0f3d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Stores; + +import java.util.Collections; + +/** + * The implementation class of {@link KGroupedTable}. + * + * @param <K> the key type + * @param <V> the value type + */ +public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> { + + private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; + + private static final String REDUCE_NAME = "KTABLE-REDUCE-"; + + private static final String REPARTITION_TOPIC_SUFFIX = "-repartition"; + + protected final Serde<K> keySerde; + protected final Serde<V> valSerde; + + private final String sourceName; + + public KGroupedTableImpl(KStreamBuilder topology, + String name, + String sourceName, + Serde<K> keySerde, + Serde<V> valSerde) { + super(topology, name, Collections.singleton(sourceName)); + this.sourceName = sourceName; + this.keySerde = keySerde; + this.valSerde = valSerde; + } + + @Override + public <T> KTable<K, T> aggregate(Initializer<T> initializer, + Aggregator<K, V, T> adder, + Aggregator<K, V, T> subtractor, + Serde<T> aggValueSerde, + String name) { + + String sinkName = topology.newName(KStreamImpl.SINK_NAME); + String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); + String aggregateName = topology.newName(AGGREGATE_NAME); + + String topic = name + REPARTITION_TOPIC_SUFFIX; + + ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer()); + ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer()); + + ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); + + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerde) + .withValues(aggValueSerde) + .persistent() + .build(); + + // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addInternalTopic(topic); + topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name); + + // read the intermediate topic + topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); + + // aggregate the values with the aggregator and local store + topology.addProcessor(aggregateName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, aggregateName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); + } + + @Override + public <T> KTable<K, T> aggregate(Initializer<T> initializer, + Aggregator<K, V, T> adder, + Aggregator<K, V, T> substractor, + String name) { + + return aggregate(initializer, adder, substractor, null, name); + } + + @Override + public KTable<K, Long> count(String name) { + return this.aggregate( + new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1L; + } + }, new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate - 1L; + } + }, + Serdes.Long(), name); + } + + @Override + public KTable<K, V> reduce(Reducer<V> adder, + Reducer<V> subtractor, + String name) { + + String sinkName = topology.newName(KStreamImpl.SINK_NAME); + String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); + String reduceName = topology.newName(REDUCE_NAME); + + String topic = name + REPARTITION_TOPIC_SUFFIX; + + ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer()); + ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer()); + + ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); + + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerde) + .withValues(valSerde) + .persistent() + .build(); + + // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addInternalTopic(topic); + topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name); + + // read the intermediate topic + topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); + + // aggregate the values with the aggregator and local store + topology.addProcessor(reduceName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, reduceName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5c291f5..51d4cb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -18,45 +18,38 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyBuilderException; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.state.Stores; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.Collections; import java.util.Set; /** - * The implementation class of KTable + * The implementation class of {@link KTable}. * @param <K> the key type * @param <S> the source's (parent's) value type * @param <V> the value type */ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { - private static final String REPARTITION_TOPIC_SUFFIX = "-repartition"; - - private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; - private static final String FILTER_NAME = "KTABLE-FILTER-"; + private static final String FOREACH_NAME = "KTABLE-FOREACH-"; + public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-"; public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-"; @@ -75,16 +68,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private static final String PRINTING_NAME = "KSTREAM-PRINTER-"; - private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - private static final String SELECT_NAME = "KTABLE-SELECT-"; public static final String SOURCE_NAME = "KTABLE-SOURCE-"; private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; - private static final String FOREACH_NAME = "KTABLE-FOREACH-"; - public final ProcessorSupplier<?, ?> processorSupplier; private final Serde<K> keySerde; @@ -172,7 +161,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } } - @Override public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, @@ -319,154 +307,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, - Aggregator<K1, V1, T> adder, - Aggregator<K1, V1, T> subtractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serde<K1> keySerde, - Serde<V1> valueSerde, - Serde<T> aggValueSerde, - String name) { + public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serde<K1> keySerde, + Serde<V1> valueSerde) { String selectName = topology.newName(SELECT_NAME); - String sinkName = topology.newName(KStreamImpl.SINK_NAME); - String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String aggregateName = topology.newName(AGGREGATE_NAME); - - String topic = name + REPARTITION_TOPIC_SUFFIX; - - ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer()); - ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer()); KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); - - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent() - .build(); - // select the aggregate key and values (old and new), it would require parent to send old values topology.addProcessor(selectName, selectSupplier, this.name); this.enableSendingOldValues(); - // send the aggregate key-value pairs to the intermediate topic for partitioning - topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName); - - // read the intermediate topic - topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); - - // aggregate the values with the aggregator and local store - topology.addProcessor(aggregateName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, aggregateName); - - // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); + return new KGroupedTableImpl<>(topology, selectName, this.name, keySerde, valueSerde); } @Override - public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, - Aggregator<K1, V1, T> adder, - Aggregator<K1, V1, T> substractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - String name) { - - return aggregate(initializer, adder, substractor, selector, null, null, null, name); - } - - @Override - public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, - Serde<K1> keySerde, - Serde<V> valueSerde, - String name) { - return this.aggregate( - new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<K1, V, Long>() { - @Override - public Long apply(K1 aggKey, V value, Long aggregate) { - return aggregate + 1L; - } - }, new Aggregator<K1, V, Long>() { - @Override - public Long apply(K1 aggKey, V value, Long aggregate) { - return aggregate - 1L; - } - }, new KeyValueMapper<K, V, KeyValue<K1, V>>() { - @Override - public KeyValue<K1, V> apply(K key, V value) { - return new KeyValue<>(selector.apply(key, value), value); - } - }, - keySerde, valueSerde, Serdes.Long(), name); - } - - @Override - public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, String name) { - return count(selector, null, null, name); - } - - @Override - public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, - Reducer<V1> subtractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serde<K1> keySerde, - Serde<V1> valueSerde, - String name) { - - String selectName = topology.newName(SELECT_NAME); - String sinkName = topology.newName(KStreamImpl.SINK_NAME); - String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String reduceName = topology.newName(REDUCE_NAME); - - String topic = name + REPARTITION_TOPIC_SUFFIX; - - ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer()); - ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer()); - - KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); - - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(valueSerde) - .persistent() - .build(); - - // select the aggregate key and values (old and new), it would require parent to send old values - topology.addProcessor(selectName, selectSupplier, this.name); - this.enableSendingOldValues(); - - // send the aggregate key-value pairs to the intermediate topic for partitioning - topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName); - - // read the intermediate topic - topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic); - - // aggregate the values with the aggregator and local store - topology.addProcessor(reduceName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, reduceName); - - // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); - } - - @Override - public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder, - Reducer<V1> subtractor, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - String name) { - - return reduce(adder, subtractor, selector, null, null, name); + public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) { + return this.groupBy(selector, null, null); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index fc01e5e..1564e95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -72,12 +72,14 @@ public class KTableAggregateTest { String topic1 = "topic1"; KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); - KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(), - new NoOpKeyValueMapper<String, String>(), - stringSerde, - stringSerde, - stringSerde, - "topic1-Canonized"); + KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(), + stringSerde, + stringSerde + ).aggregate(new StringInit(), + new StringAdd(), + new StringRemove(), + stringSerde, + "topic1-Canonized"); MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 0a02824..95e0fbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -220,15 +220,14 @@ public class SmokeTestClient extends SmokeTestUtil { // test repartition Agg agg = new Agg(); - cntTable.aggregate( - agg.init(), - agg.adder(), - agg.remover(), - agg.selector(), - stringSerde, - longSerde, - longSerde, - "cntByCnt" + cntTable.groupBy(agg.selector(), + stringSerde, + longSerde + ).aggregate(agg.init(), + agg.adder(), + agg.remover(), + longSerde, + "cntByCnt" ).to(stringSerde, longSerde, "tagg"); return new KafkaStreams(builder, props);
