Repository: kafka Updated Branches: refs/heads/trunk 8189f9d58 -> 95174337c
KAFKA-3121: Refactor KStream Aggregate to be Lambda-able. Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #839 from guozhangwang/K3121s2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/95174337 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/95174337 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/95174337 Branch: refs/heads/trunk Commit: 95174337c2f6cda90e213e5c3a73fc89854f42a7 Parents: 8189f9d Author: Guozhang Wang <[email protected]> Authored: Tue Feb 2 12:01:47 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Feb 2 12:01:47 2016 -0800 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedJob.java | 3 +- .../examples/pageview/PageViewUnTypedJob.java | 4 +-- .../examples/wordcount/WordCountJob.java | 4 +-- .../kafka/streams/kstream/Aggregator.java | 16 +-------- .../org/apache/kafka/streams/kstream/Count.java | 36 -------------------- .../kafka/streams/kstream/Initializer.java | 23 +++++++++++++ .../apache/kafka/streams/kstream/KStream.java | 24 +++++++++---- .../apache/kafka/streams/kstream/KTable.java | 26 ++++++++++++-- .../apache/kafka/streams/kstream/SumAsLong.java | 36 -------------------- .../kstream/internals/KStreamAggregate.java | 13 ++++--- .../streams/kstream/internals/KStreamImpl.java | 27 +++++++++++++-- .../kstream/internals/KTableAggregate.java | 17 +++++---- .../streams/kstream/internals/KTableImpl.java | 36 ++++++++++++++++++-- .../kstream/internals/KStreamAggregateTest.java | 23 ++++++------- .../kstream/internals/KTableAggregateTest.java | 23 ++++++++----- 15 files changed, 175 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java index 7f11512..358cbe8 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Count; import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; @@ -99,7 +98,7 @@ public class PageViewTypedJob { return viewByRegion; }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) - .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), stringSerializer, longSerializer, stringDeserializer, longDeserializer) .toStream() http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java index 013332e..2fdfa97 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java @@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; @@ -78,7 +78,7 @@ public class PageViewUnTypedJob { .put("region", region); }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) - .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), stringSerializer, longSerializer, stringDeserializer, longDeserializer) .toStream() http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java index c66e965..b922695 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -79,7 +79,7 @@ public class WordCountJob { return new KeyValue<String, String>(value, value); } }) - .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L), + .countByKey(UnlimitedWindows.of("Counts").startOn(0L), stringSerializer, longSerializer, stringDeserializer, longDeserializer) .toStream() http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java index c601024..e3eb18f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java @@ -18,20 +18,6 @@ package org.apache.kafka.streams.kstream; public interface Aggregator<K, V, T> { - /** - * Set the initial aggregate value - */ - T initialValue(K aggKey); - /** - * When a new record with the aggregate key is added, - * updating the aggregate value for this key - */ - T add(K aggKey, V value, T aggregate); - - /** - * when an old record with the aggregate key is removed, - * updating the aggregate value for this key - */ - T remove(K aggKey, V value, T aggregate); + T apply(K aggKey, V value, T aggregate); } http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java deleted file mode 100644 index 8780cc7..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public class Count<K, V> implements Aggregator<K, V, Long> { - - @Override - public Long initialValue(K aggKey) { - return 0L; - } - - @Override - public Long add(K aggKey, V value, Long aggregate) { - return aggregate + 1L; - } - - @Override - public Long remove(K aggKey, V value, Long aggregate) { - return aggregate - 1L; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java new file mode 100644 index 0000000..0aeddc9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface Initializer<T> { + + T apply(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 26f04f0..f6fa48d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -274,24 +274,36 @@ public interface KStream<K, V> { * @param windows the specification of the aggregation window */ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, - Windows<W> windows, - Serializer<K> keySerializer, - Serializer<V> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> aggValueDeserializer); + Windows<W> windows, + Serializer<K> keySerializer, + Serializer<V> aggValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> aggValueDeserializer); /** * Aggregate values of this stream by key on a window basis. * + * @param initializer the class of Initializer * @param aggregator the class of Aggregator * @param windows the specification of the aggregation window * @param <T> the value type of the aggregated table */ - <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator, + <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, Windows<W> windows, Serializer<K> keySerializer, Serializer<T> aggValueSerializer, Deserializer<K> keyDeserializer, Deserializer<T> aggValueDeserializer); + /** + * Count number of messages of this stream by key on a window basis. + * + * @param windows the specification of the aggregation window + */ + <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, + Serializer<K> keySerializer, + Serializer<Long> aggValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<Long> aggValueDeserializer); } http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index feb28ab..5cd9d9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -162,14 +162,18 @@ public interface KTable<K, V> { /** * Aggregate values of this table by the selected key. * - * @param aggregator the class of Aggregator + * @param initializer the class of Initializer + * @param add the class of Aggregator + * @param remove the class of Aggregator * @param selector the KeyValue mapper that select the aggregate key * @param name the name of the resulted table * @param <K1> the key type of the aggregated table * @param <V1> the value type of the aggregated table * @return the instance of KTable */ - <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator, + <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, + Aggregator<K1, V1, T> add, + Aggregator<K1, V1, T> remove, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, @@ -178,4 +182,22 @@ public interface KTable<K, V> { Deserializer<V1> valueDeserializer, Deserializer<T> aggValueDeserializer, String name); + + /** + * Count number of records of this table by the selected key. + * + * @param selector the KeyValue mapper that select the aggregate key + * @param name the name of the resulted table + * @param <K1> the key type of the aggregated table + * @param <V1> the value type of the aggregated table + * @return the instance of KTable + */ + <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Serializer<Long> aggValueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + Deserializer<Long> aggValueDeserializer, + String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java deleted file mode 100644 index 1f8df04..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public class SumAsLong<K> implements Aggregator<K, Long, Long> { - - @Override - public Long initialValue(K aggKey) { - return 0L; - } - - @Override - public Long add(K aggKey, Long value, Long aggregate) { - return aggregate + value; - } - - @Override - public Long remove(K aggKey, Long value, Long aggregate) { - return aggregate - value; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 49f3e71..b64277c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; @@ -35,13 +36,15 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces private final String storeName; private final Windows<W> windows; + private final Initializer<T> initializer; private final Aggregator<K, V, T> aggregator; private boolean sendOldValues = false; - public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) { + public KStreamAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) { this.windows = windows; this.storeName = storeName; + this.initializer = initializer; this.aggregator = aggregator; } @@ -97,10 +100,10 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces T oldAgg = entry.value; if (oldAgg == null) - oldAgg = aggregator.initialValue(key); + oldAgg = initializer.apply(); // try to add the new new value (there will never be old value) - T newAgg = aggregator.add(key, value, oldAgg); + T newAgg = aggregator.apply(key, value, oldAgg); // update the store with the new value windowStore.put(key, newAgg, window.start()); @@ -119,8 +122,8 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { - T oldAgg = aggregator.initialValue(key); - T newAgg = aggregator.add(key, value, oldAgg); + T oldAgg = initializer.apply(); + T newAgg = aggregator.apply(key, value, oldAgg); windowStore.put(key, newAgg, windowStartMs); http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 73f7266..79a3115 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -421,7 +422,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator, + public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, + Aggregator<K, V, T> aggregator, Windows<W> windows, Serializer<K> keySerializer, Serializer<T> aggValueSerializer, @@ -434,7 +436,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String selectName = topology.newName(SELECT_NAME); ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>(); - ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator); + ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator); StateStoreSupplier aggregateStore = Stores.create(windows.name()) .withKeys(keySerializer, keyDeserializer) @@ -451,4 +453,25 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes); } + + @Override + public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, + Serializer<K> keySerializer, + Serializer<Long> aggValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<Long> aggValueDeserializer) { + return this.aggregateByKey( + new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1L; + } + }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 1730a8c..6ce776a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -26,13 +27,17 @@ import org.apache.kafka.streams.state.KeyValueStore; public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> { private final String storeName; - private final Aggregator<K, V, T> aggregator; + private final Initializer<T> initializer; + private final Aggregator<K, V, T> add; + private final Aggregator<K, V, T> remove; private boolean sendOldValues = false; - public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) { + public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> add, Aggregator<K, V, T> remove) { this.storeName = storeName; - this.aggregator = aggregator; + this.initializer = initializer; + this.add = add; + this.remove = remove; } @Override @@ -62,18 +67,18 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T T oldAgg = store.get(key); if (oldAgg == null) - oldAgg = aggregator.initialValue(key); + oldAgg = initializer.apply(); T newAgg = oldAgg; // first try to remove the old value if (value.oldValue != null) { - newAgg = aggregator.remove(key, value.oldValue, newAgg); + newAgg = remove.apply(key, value.oldValue, newAgg); } // then try to add the new new value if (value.newValue != null) { - newAgg = aggregator.add(key, value.newValue, newAgg); + newAgg = add.apply(key, value.newValue, newAgg); } // update the store with the new value http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 4398e3f..9853737 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -246,7 +247,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator, + public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, + Aggregator<K1, V1, T> add, + Aggregator<K1, V1, T> remove, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, @@ -268,7 +271,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator); + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove); StateStoreSupplier aggregateStore = Stores.create(name) .withKeys(keySerializer, keyDeserializer) @@ -296,6 +299,35 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override + public <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Serializer<Long> aggValueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + Deserializer<Long> aggValueDeserializer, + String name) { + return this.aggregate( + new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<K1, V1, Long>() { + @Override + public Long apply(K1 aggKey, V1 value, Long aggregate) { + return aggregate + 1L; + } + }, new Aggregator<K1, V1, Long>() { + @Override + public Long apply(K1 aggKey, V1 value, Long aggregate) { + return aggregate - 1L; + } + }, selector, keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name); + } + + @Override public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, Reducer<V1> removeReducer, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java index 8a81113..93c5df6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -43,21 +44,19 @@ public class KStreamAggregateTest { private final Serializer<String> strSerializer = new StringSerializer(); private final Deserializer<String> strDeserializer = new StringDeserializer(); - private class StringCanonizer implements Aggregator<String, String, String> { + private class StringAdd implements Aggregator<String, String, String> { @Override - public String initialValue(String aggKey) { - return "0"; - } - - @Override - public String add(String aggKey, String value, String aggregate) { + public String apply(String aggKey, String value, String aggregate) { return aggregate + "+" + value; } + } + + private class StringInit implements Initializer<String> { @Override - public String remove(String aggKey, String value, String aggregate) { - return aggregate + "-" + value; + public String apply() { + return "0"; } } @@ -70,7 +69,7 @@ public class KStreamAggregateTest { String topic1 = "topic1"; KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); - KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(), + KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic1-Canonized").with(10L).every(5L), strSerializer, strSerializer, @@ -149,7 +148,7 @@ public class KStreamAggregateTest { String topic2 = "topic2"; KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); - KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringCanonizer(), + KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic1-Canonized").with(10L).every(5L), strSerializer, strSerializer, @@ -160,7 +159,7 @@ public class KStreamAggregateTest { table1.toStream().process(proc1); KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2); - KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringCanonizer(), + KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic2-Canonized").with(10L).every(5L), strSerializer, strSerializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 439aa09..59711db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; @@ -40,21 +41,27 @@ public class KTableAggregateTest { private final Serializer<String> strSerializer = new StringSerializer(); private final Deserializer<String> strDeserializer = new StringDeserializer(); - private class StringCanonizer implements Aggregator<String, String, String> { + private class StringAdd implements Aggregator<String, String, String> { @Override - public String initialValue(String aggKey) { - return "0"; + public String apply(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; } + } + + private class StringRemove implements Aggregator<String, String, String> { @Override - public String add(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; + public String apply(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; } + } + + private class StringInit implements Initializer<String> { @Override - public String remove(String aggKey, String value, String aggregate) { - return aggregate + "-" + value; + public String apply() { + return "0"; } } @@ -67,7 +74,7 @@ public class KTableAggregateTest { String topic1 = "topic1"; KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); - KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(), + KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(), new NoOpKeyValueMapper<String, String>(), strSerializer, strSerializer,
