This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 93ba962 KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (#6285) 93ba962 is described below commit 93ba9621fe0ebe2945fe5d14a3c94abc5cffd7b4 Author: asutosh936 <asutosh.pan...@hotmail.com> AuthorDate: Fri Feb 22 11:07:30 2019 -0600 KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (#6285) This is an update to the existing javadocs for KGroupedStream class. Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/kstream/KGroupedStream.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 05e4ac9..121d0a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -146,7 +146,9 @@ public interface KGroupedStream<K, V> { * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable<K, V> reduce(final Reducer<V> reducer); @@ -208,7 +210,9 @@ public interface KGroupedStream<K, V> { * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); @@ -251,7 +255,9 @@ public interface KGroupedStream<K, V> { * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); @@ -308,7 +314,9 @@ public interface KGroupedStream<K, V> { * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator,