This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 649e2cbc8f3 MINOR: Fix `Consumed` to return new object instead of 
`this` (#14550)
649e2cbc8f3 is described below

commit 649e2cbc8f3b13c39b2ed559f3f9e4a15013b64e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Oct 15 19:28:54 2023 -0700

    MINOR: Fix `Consumed` to return new object instead of `this` (#14550)
    
    We embrace immutability and thus should return a new object instead of
    `this`, similar to other config classed we use in the DSL.
    
    Side JavaDocs cleanup for a bunch of classes.
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../apache/kafka/streams/kstream/Aggregator.java   |  19 ++--
 .../org/apache/kafka/streams/kstream/Branched.java | 107 ++++++++++++---------
 .../kafka/streams/kstream/BranchedKStream.java     |  38 +++++---
 .../kafka/streams/kstream/CogroupedKStream.java    |  84 +++++++++-------
 .../org/apache/kafka/streams/kstream/Consumed.java | 107 ++++++++++++---------
 .../apache/kafka/streams/kstream/EmitStrategy.java |   7 +-
 .../kafka/streams/kstream/ForeachAction.java       |   7 +-
 .../apache/kafka/streams/kstream/GlobalKTable.java |   1 +
 .../org/apache/kafka/streams/kstream/Grouped.java  |  61 +++++++++---
 .../apache/kafka/streams/kstream/Initializer.java  |   8 +-
 10 files changed, 275 insertions(+), 164 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 217a145e620..e02146fff1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -24,10 +24,11 @@ package org.apache.kafka.streams.kstream;
  * {@code Aggregator} is used in combination with {@link Initializer} that 
provides an initial aggregation value.
  * <p>
  * {@code Aggregator} can be used to implement aggregation functions like 
count.
-
+ *
  * @param <K> key type
  * @param <V> input value type
- * @param <VA> aggregate value type
+ * @param <VAgg> aggregate value type
+ *
  * @see Initializer
  * @see KGroupedStream#aggregate(Initializer, Aggregator)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@@ -37,15 +38,19 @@ package org.apache.kafka.streams.kstream;
  * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, 
Materialized)
  * @see Reducer
  */
-public interface Aggregator<K, V, VA> {
+public interface Aggregator<K, V, VAgg> {
 
     /**
      * Compute a new aggregate from the key and value of a record and the 
current aggregate of the same key.
      *
-     * @param key       the key of the record
-     * @param value     the value of the record
-     * @param aggregate the current aggregate value
+     * @param key
+     *        the key of the record
+     * @param value
+     *        the value of the record
+     * @param aggregate
+     *        the current aggregate value
+     *
      * @return the new aggregate value
      */
-    VA apply(final K key, final V value, final VA aggregate);
+    VAgg apply(final K key, final V value, final VAgg aggregate);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
index 3e85a73c346..2f55f80899a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
@@ -41,12 +41,19 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
         this.chainConsumer = chainConsumer;
     }
 
+    protected Branched(final Branched<K, V> branched) {
+        this(branched.name, branched.chainFunction, branched.chainConsumer);
+    }
+
     /**
      * Create an instance of {@code Branched} with provided branch name suffix.
      *
-     * @param name the branch name suffix to be used (see {@link 
BranchedKStream} description for details)
-     * @param <K>  key type
-     * @param <V>  value type
+     * @param name
+     *        the branch name suffix to be used (see {@link BranchedKStream} 
description for details)
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@code Branched}
      */
     public static <K, V> Branched<K, V> as(final String name) {
@@ -57,16 +64,20 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
     /**
      * Create an instance of {@code Branched} with provided chain function.
      *
-     * @param chain A function that will be applied to the branch. If the 
provided function returns
-     *              {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
-     *              by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
-     *              {@link BranchedKStream} description for details).
-     * @param <K>   key type
-     * @param <V>   value type
+     * @param chain
+     *        A function that will be applied to the branch. If the provided 
function returns
+     *        {@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+     *        by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+     *        {@link BranchedKStream} description for details).
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@code Branched}
      */
     public static <K, V> Branched<K, V> withFunction(
-            final Function<? super KStream<K, V>, ? extends KStream<K, V>> 
chain) {
+            final Function<? super KStream<K, V>, ? extends KStream<K, V>> 
chain
+    ) {
         Objects.requireNonNull(chain, "chain function cannot be null");
         return new Branched<>(null, chain, null);
     }
@@ -74,12 +85,15 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
     /**
      * Create an instance of {@code Branched} with provided chain consumer.
      *
-     * @param chain A consumer to which the branch will be sent. If a consumer 
is provided,
-     *              the respective branch will not be added to the resulting 
{@code Map} returned
-     *              by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
-     *              {@link BranchedKStream} description for details).
-     * @param <K>   key type
-     * @param <V>   value type
+     * @param chain
+     *        A consumer to which the branch will be sent. If a consumer is 
provided,
+     *        the respective branch will not be added to the resulting {@code 
Map} returned
+     *        by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+     *        {@link BranchedKStream} description for details).
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@code Branched}
      */
     public static <K, V> Branched<K, V> withConsumer(final Consumer<KStream<K, 
V>> chain) {
@@ -90,18 +104,24 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
     /**
      * Create an instance of {@code Branched} with provided chain function and 
branch name suffix.
      *
-     * @param chain A function that will be applied to the branch. If the 
provided function returns
-     *              {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
-     *              by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
-     *              {@link BranchedKStream} description for details).
-     * @param name  the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
-     *              (see {@link BranchedKStream} description for details)
-     * @param <K>   key type
-     * @param <V>   value type
+     * @param chain
+     *        A function that will be applied to the branch. If the provided 
function returns
+     *        {@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+     *        by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+     *        {@link BranchedKStream} description for details).
+     * @param name
+     *        the branch name suffix to be used. If {@code null}, a default 
branch name suffix will be generated
+     *        (see {@link BranchedKStream} description for details)
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@code Branched}
      */
     public static <K, V> Branched<K, V> withFunction(
-            final Function<? super KStream<K, V>, ? extends KStream<K, V>> 
chain, final String name) {
+        final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain,
+        final String name
+    ) {
         Objects.requireNonNull(chain, "chain function cannot be null");
         return new Branched<>(name, chain, null);
     }
@@ -109,14 +129,18 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
     /**
      * Create an instance of {@code Branched} with provided chain consumer and 
branch name suffix.
      *
-     * @param chain A consumer to which the branch will be sent. If a non-null 
consumer is provided,
-     *              the respective branch will not be added to the resulting 
{@code Map} returned
-     *              by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
-     *              {@link BranchedKStream} description for details).
-     * @param name  the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
-     *              (see {@link BranchedKStream} description for details)
-     * @param <K>   key type
-     * @param <V>   value type
+     * @param chain
+     *        A consumer to which the branch will be sent. If a non-null 
consumer is provided,
+     *        the respective branch will not be added to the resulting {@code 
Map} returned
+     *        by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+     *        {@link BranchedKStream} description for details).
+     * @param name
+     *        the branch name suffix to be used. If {@code null}, a default 
branch name suffix will be generated
+     *        (see {@link BranchedKStream} description for details)
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@code Branched}
      */
     public static <K, V> Branched<K, V> withConsumer(final Consumer<? super 
KStream<K, V>> chain,
@@ -125,21 +149,14 @@ public class Branched<K, V> implements 
NamedOperation<Branched<K, V>> {
         return new Branched<>(name, null, chain);
     }
 
-    /**
-     * Create an instance of {@code Branched} from an existing instance.
-     *
-     * @param branched the instance of {@code Branched} to copy
-     */
-    protected Branched(final Branched<K, V> branched) {
-        this(branched.name, branched.chainFunction, branched.chainConsumer);
-    }
-
     /**
      * Configure the instance of {@code Branched} with a branch name suffix.
      *
-     * @param name the branch name suffix to be used. If {@code null} a 
default branch name suffix will be generated (see
-     *             {@link BranchedKStream} description for details)
-     * @return {@code this}
+     * @param name
+     *        the branch name suffix to be used. If {@code null} a default 
branch name suffix will be generated (see
+     *        {@link BranchedKStream} description for details)
+     *
+     * @return {@code this} to facilitate method chaining
      */
     @Override
     public Branched<K, V> withName(final String name) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
index 2115170cc94..43475a4227c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
@@ -78,7 +78,7 @@ import java.util.Map;
  *
  * <h3>Direct Branch Consuming</h3>
  * In many cases we do not need to have a single scope for all the branches, 
each branch being processed completely
- * independently from others. Then we can use 'consuming' lambdas or method 
references in {@link Branched} parameter:
+ * independently of others. Then we can use 'consuming' lambdas or method 
references in {@link Branched} parameter:
  *
  * <pre> {@code
  * source.split()
@@ -112,16 +112,19 @@ import java.util.Map;
  *
  * @param <K> Type of keys
  * @param <V> Type of values
+ *
  * @see KStream
  */
 public interface BranchedKStream<K, V> {
     /**
      * Define a branch for records that match the predicate.
      *
-     * @param predicate A {@link Predicate} instance, against which each 
record will be evaluated.
-     *                  If this predicate returns {@code true} for a given 
record, the record will be
-     *                  routed to the current branch and will not be evaluated 
against the predicates
-     *                  for the remaining branches.
+     * @param predicate
+     *        A {@link Predicate} instance, against which each record will be 
evaluated.
+     *        If this predicate returns {@code true} for a given record, the 
record will be
+     *        routed to the current branch and will not be evaluated against 
the predicates
+     *        for the remaining branches.
+     *
      * @return {@code this} to facilitate method chaining
      */
     BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
@@ -129,13 +132,16 @@ public interface BranchedKStream<K, V> {
     /**
      * Define a branch for records that match the predicate.
      *
-     * @param predicate A {@link Predicate} instance, against which each 
record will be evaluated.
-     *                  If this predicate returns {@code true} for a given 
record, the record will be
-     *                  routed to the current branch and will not be evaluated 
against the predicates
-     *                  for the remaining branches.
-     * @param branched  A {@link Branched} parameter, that allows to define a 
branch name, an in-place
-     *                  branch consumer or branch mapper (see <a 
href="#examples">code examples</a>
-     *                  for {@link BranchedKStream})
+     * @param predicate
+     *        A {@link Predicate} instance, against which each record will be 
evaluated.
+     *        If this predicate returns {@code true} for a given record, the 
record will be
+     *        routed to the current branch and will not be evaluated against 
the predicates
+     *        for the remaining branches.
+     * @param branched
+     *        A {@link Branched} parameter, that allows to define a branch 
name, an in-place
+     *        branch consumer or branch mapper (see <a href="#examples">code 
examples</a>
+     *        for {@link BranchedKStream})
+     *
      * @return {@code this} to facilitate method chaining
      */
     BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, 
Branched<K, V> branched);
@@ -153,9 +159,11 @@ public interface BranchedKStream<K, V> {
      * Finalize the construction of branches and defines the default branch 
for the messages not intercepted
      * by other branches. Calling {@code defaultBranch} or {@link 
#noDefaultBranch()} is optional.
      *
-     * @param branched A {@link Branched} parameter, that allows to define a 
branch name, an in-place
-     *                 branch consumer or branch mapper (see <a 
href="#examples">code examples</a>
-     *                 for {@link BranchedKStream})
+     * @param branched
+     *        A {@link Branched} parameter, that allows to define a branch 
name, an in-place
+     *        branch consumer or branch mapper (see <a href="#examples">code 
examples</a>
+     *        for {@link BranchedKStream})
+     *
      * @return {@link Map} of named branches. For rules of forming the 
resulting map, see {@link BranchedKStream}
      * <a href="#maprules">description</a>.
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index ccb003ea755..f4c975848b2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -36,9 +36,9 @@ import 
org.apache.kafka.streams.state.TimestampedKeyValueStore;
  * {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
  *
  * @param <K> Type of keys
- * @param <VOut> Type of values after agg
+ * @param <VAgg> Type of values after agg
  */
-public interface CogroupedKStream<K, VOut> {
+public interface CogroupedKStream<K, VAgg> {
 
     /**
      * Add an already {@link KGroupedStream grouped KStream} to this {@code 
CogroupedKStream}.
@@ -54,13 +54,17 @@ public interface CogroupedKStream<K, VOut> {
      * using the initial intermediate aggregation result provided via the 
{@link Initializer} that is passed into
      * {@link #aggregate(Initializer)}) and the record's value.
      *
-     * @param groupedStream a group stream
-     * @param aggregator    an {@link Aggregator} that computes a new 
aggregate result
-     * @param <VIn> Type of input values
+     * @param groupedStream
+     *        a group stream
+     * @param aggregator
+     *        an {@link Aggregator} that computes a new aggregate result
+     *
+     * @param <V> Type of input values
+     *
      * @return a {@code CogroupedKStream}
      */
-    <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> 
groupedStream,
-                                            final Aggregator<? super K, ? 
super VIn, VOut> aggregator);
+    <V> CogroupedKStream<K, VAgg> cogroup(final KGroupedStream<K, V> 
groupedStream,
+                                          final Aggregator<? super K, ? super 
V, VAgg> aggregator);
 
     /**
      * Aggregate the values of records in these streams by the grouped key.
@@ -105,12 +109,14 @@ public interface CogroupedKStream<K, VOut> {
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
-     * @param initializer  an {@link Initializer} that computes an initial 
intermediate aggregation
-     *                     result. Cannot be {@code null}.
+     * @param initializer
+     *        an {@link Initializer} that computes an initial intermediate 
aggregation
+     *        result. Cannot be {@code null}.
+     *
      * @return a {@link KTable} that contains "update" records with unmodified 
keys, and values that
      * represent the latest (rolling) aggregate for each key
      */
-    KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
+    KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer);
 
     /**
      * Aggregate the values of records in these streams by the grouped key.
@@ -156,13 +162,15 @@ public interface CogroupedKStream<K, VOut> {
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
-     * @param initializer  an {@link Initializer} that computes an initial 
intermediate aggregation
-     *                     result. Cannot be {@code null}.
-     * @param named        name the processor. Cannot be {@code null}.
+     * @param initializer
+     *        an {@link Initializer} that computes an initial intermediate 
aggregation result. Cannot be {@code null}.
+     * @param named
+     *        name the processor. Cannot be {@code null}.
+     *
      * @return a {@link KTable} that contains "update" records with unmodified 
keys, and values that
      * represent the latest (rolling) aggregate for each key
      */
-    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+    KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
                               final Named named);
 
     /**
@@ -208,15 +216,16 @@ public interface CogroupedKStream<K, VOut> {
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
-     * @param initializer  an {@link Initializer} that computes an initial 
intermediate aggregation
-     *                     result. Cannot be {@code null}.
-     * @param materialized an instance of {@link Materialized} used to 
materialize a state store.
-     *                     Cannot be {@code null}.
+     * @param initializer
+     *        an {@link Initializer} that computes an initial intermediate 
aggregation result. Cannot be {@code null}.
+     * @param materialized
+     *        an instance of {@link Materialized} used to materialize a state 
store. Cannot be {@code null}.
+     *
      * @return a {@link KTable} that contains "update" records with unmodified 
keys, and values that
      * represent the latest (rolling) aggregate for each key
      */
-    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
-                              final Materialized<K, VOut, KeyValueStore<Bytes, 
byte[]>> materialized);
+    KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+                              final Materialized<K, VAgg, KeyValueStore<Bytes, 
byte[]>> materialized);
 
     /**
      * Aggregate the values of records in these streams by the grouped key.
@@ -262,44 +271,53 @@ public interface CogroupedKStream<K, VOut> {
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
-     * @param initializer  an {@link Initializer} that computes an initial 
intermediate aggregation
-     *                     result. Cannot be {@code null}.
-     * @param materialized an instance of {@link Materialized} used to 
materialize a state store.
-     *                     Cannot be {@code null}.
-     * @param named        name the processors. Cannot be {@code null}.
+     * @param initializer
+     *        an {@link Initializer} that computes an initial intermediate 
aggregation result. Cannot be {@code null}.
+     * @param materialized
+     *        an instance of {@link Materialized} used to materialize a state 
store. Cannot be {@code null}.
+     * @param named
+     *        name the processors. Cannot be {@code null}.
+     *
      * @return a {@link KTable} that contains "update" records with unmodified 
keys, and values that
      * represent the latest (rolling) aggregate for each key
      */
-    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+    KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
                               final Named named,
-                              final Materialized<K, VOut, KeyValueStore<Bytes, 
byte[]>> materialized);
+                              final Materialized<K, VAgg, KeyValueStore<Bytes, 
byte[]>> materialized);
 
     /**
      * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform windowed
      * aggregations.
      *
-     * @param windows the specification of the aggregation {@link Windows}
+     * @param windows
+     *        the specification of the aggregation {@link Windows}
+     *
      * @param <W>     the window type
+     *
      * @return an instance of {@link TimeWindowedCogroupedKStream}
      */
-    <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final 
Windows<W> windows);
+    <W extends Window> TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final 
Windows<W> windows);
 
     /**
      * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform sliding
      * windowed aggregations.
      *
-     * @param windows the specification of the aggregation {@link 
SlidingWindows}
+     * @param windows
+     *        the specification of the aggregation {@link SlidingWindows}
+     *
      * @return an instance of {@link TimeWindowedCogroupedKStream}
      */
-    TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows 
windows);
+    TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final SlidingWindows 
windows);
 
     /**
      * Create a new {@link SessionWindowedCogroupedKStream} instance that can 
be used to perform session
      * windowed aggregations.
      *
-     * @param windows the specification of the aggregation {@link 
SessionWindows}
+     * @param windows
+     *        the specification of the aggregation {@link SessionWindows}
+     *
      * @return an instance of {@link SessionWindowedCogroupedKStream}
      */
-    SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows 
windows);
+    SessionWindowedCogroupedKStream<K, VAgg> windowedBy(final SessionWindows 
windows);
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 423ca6022d8..d1713ab20a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -70,10 +70,6 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
         this.processorName = processorName;
     }
 
-    /**
-     * Create an instance of {@link Consumed} from an existing instance.
-     * @param consumed  the instance of {@link Consumed} to copy
-     */
     protected Consumed(final Consumed<K, V> consumed) {
         this(consumed.keySerde,
              consumed.valueSerde,
@@ -86,12 +82,18 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     /**
      * Create an instance of {@link Consumed} with the supplied arguments. 
{@code null} values are acceptable.
      *
-     * @param keySerde           the key serde. If {@code null} the default 
key serde from config will be used
-     * @param valueSerde         the value serde. If {@code null} the default 
value serde from config will be used
-     * @param timestampExtractor the timestamp extractor to used. If {@code 
null} the default timestamp extractor from config will be used
-     * @param resetPolicy        the offset reset policy to be used. If {@code 
null} the default reset policy from config will be used
-     * @param <K>                key type
-     * @param <V>                value type
+     * @param keySerde
+     *        the key serde. If {@code null} the default key serde from config 
will be used
+     * @param valueSerde
+     *        the value serde. If {@code null} the default value serde from 
config will be used
+     * @param timestampExtractor
+     *        the timestamp extractor to used. If {@code null} the default 
timestamp extractor from config will be used
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@@ -99,16 +101,19 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
                                              final TimestampExtractor 
timestampExtractor,
                                              final Topology.AutoOffsetReset 
resetPolicy) {
         return new Consumed<>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, null);
-
     }
 
     /**
      * Create an instance of {@link Consumed} with key and value {@link 
Serde}s.
      *
-     * @param keySerde   the key serde. If {@code null} the default key serde 
from config will be used
-     * @param valueSerde the value serde. If {@code null} the default value 
serde from config will be used
-     * @param <K>        key type
-     * @param <V>        value type
+     * @param keySerde
+     *        the key serde. If {@code null} the default key serde from config 
will be used
+     * @param valueSerde
+     *        the value serde. If {@code null} the default value serde from 
config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@@ -119,9 +124,12 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     /**
      * Create an instance of {@link Consumed} with a {@link 
TimestampExtractor}.
      *
-     * @param timestampExtractor the timestamp extractor to used. If {@code 
null} the default timestamp extractor from config will be used
-     * @param <K>                key type
-     * @param <V>                value type
+     * @param timestampExtractor
+     *        the timestamp extractor to used. If {@code null} the default 
timestamp extractor from config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final TimestampExtractor 
timestampExtractor) {
@@ -131,9 +139,12 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     /**
      * Create an instance of {@link Consumed} with a {@link 
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
      *
-     * @param resetPolicy the offset reset policy to be used. If {@code null} 
the default reset policy from config will be used
-     * @param <K>         key type
-     * @param <V>         value type
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset 
resetPolicy) {
@@ -143,9 +154,12 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     /**
      * Create an instance of {@link Consumed} with provided processor name.
      *
-     * @param processorName the processor name to be used. If {@code null} a 
default processor name will be generated
-     * @param <K>         key type
-     * @param <V>         value type
+     * @param processorName
+     *        the processor name to be used. If {@code null} a default 
processor name will be generated
+     *
+     * @param <K> key type
+     * @param <V> value type
+     *
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> as(final String processorName) {
@@ -155,57 +169,62 @@ public class Consumed<K, V> implements 
NamedOperation<Consumed<K, V>> {
     /**
      * Configure the instance of {@link Consumed} with a key {@link Serde}.
      *
-     * @param keySerde the key serde. If {@code null}the default key serde 
from config will be used
-     * @return this
+     * @param keySerde
+     *        the key serde. If {@code null} the default key serde from config 
will be used
+     *
+     * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
-        this.keySerde = keySerde;
-        return this;
+        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
     }
 
     /**
      * Configure the instance of {@link Consumed} with a value {@link Serde}.
      *
-     * @param valueSerde the value serde. If {@code null} the default value 
serde from config will be used
-     * @return this
+     * @param valueSerde
+     *        the value serde. If {@code null} the default value serde from 
config will be used
+     *
+     * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
-        this.valueSerde = valueSerde;
-        return this;
+        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
     }
 
     /**
      * Configure the instance of {@link Consumed} with a {@link 
TimestampExtractor}.
      *
-     * @param timestampExtractor the timestamp extractor to used. If {@code 
null} the default timestamp extractor from config will be used
-     * @return this
+     * @param timestampExtractor
+     *        the timestamp extractor to used. If {@code null} the default 
timestamp extractor from config will be used
+     *
+     * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withTimestampExtractor(final TimestampExtractor 
timestampExtractor) {
-        this.timestampExtractor = timestampExtractor;
-        return this;
+        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
     }
 
     /**
      * Configure the instance of {@link Consumed} with a {@link 
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
      *
-     * @param resetPolicy the offset reset policy to be used. If {@code null} 
the default reset policy from config will be used
-     * @return this
+     * @param resetPolicy
+     *        the offset reset policy to be used. If {@code null} the default 
reset policy from config will be used
+     *
+     * @return a new instance of {@link Consumed}
      */
     public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset 
resetPolicy) {
-        this.resetPolicy = resetPolicy;
-        return this;
+        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
     }
 
     /**
      * Configure the instance of {@link Consumed} with a processor name.
      *
-     * @param processorName the processor name to be used. If {@code null} a 
default processor name will be generated
-     * @return this
+     * @param processorName
+     *        the processor name to be used. If {@code null} a default 
processor name will be generated
+     *
+     * @return a new instance of {@link Consumed}
      */
     @Override
     public Consumed<K, V> withName(final String processorName) {
-        this.processorName = processorName;
-        return this;
+        return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, 
resetPolicy, processorName);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
index 365a19c1426..bacb4352b93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
@@ -68,7 +68,8 @@ public interface EmitStrategy {
     }
 
     /**
-     * Returns the strategy type
+     * Returns the strategy type.
+     *
      * @return Emit strategy type
      */
     StrategyType type();
@@ -87,7 +88,7 @@ public interface EmitStrategy {
      * @see UnlimitedWindows
      * @see WindowUpdateStrategy
      *
-     * @return WindowCloseStrategy instance
+     * @return "window close" {@code EmitStrategy} instance
      */
     static EmitStrategy onWindowClose() {
         return new WindowCloseStrategy();
@@ -103,7 +104,7 @@ public interface EmitStrategy {
      * @see UnlimitedWindows
      * @see WindowCloseStrategy
      *
-     * @return WindowCloseStrategy instance
+     * @return "window update" {@code EmitStrategy} instance
      */
     static EmitStrategy onWindowUpdate() {
         return new WindowUpdateStrategy();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index f20d77ccb1c..afdc972985c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -27,6 +27,7 @@ package org.apache.kafka.streams.kstream;
  *
  * @param <K> key type
  * @param <V> value type
+ *
  * @see KStream#foreach(ForeachAction)
  */
 public interface ForeachAction<K, V> {
@@ -34,8 +35,10 @@ public interface ForeachAction<K, V> {
     /**
      * Perform an action for each record of a stream.
      *
-     * @param key   the key of the record
-     * @param value the value of the record
+     * @param key
+     *        the key of the record
+     * @param value
+     *        the value of the record
      */
     void apply(final K key, final V value);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 73efbc3a512..2f4c380012e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -59,6 +59,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
+ *
  * @see KTable
  * @see StreamsBuilder#globalTable(String)
  * @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
index 44b07408201..631969734bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -48,8 +48,14 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
     /**
      * Create a {@link Grouped} instance with the provided name used as part 
of the repartition topic if required.
      *
-     * @param name the name used for a repartition topic if required
+     * @param name
+     *        the name used for a repartition topic if required
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     *
      * @return a new {@link Grouped} configured with the name
+     *
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
      * @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -62,8 +68,14 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
     /**
      * Create a {@link Grouped} instance with the provided keySerde. If {@code 
null} the default key serde from config will be used.
      *
-     * @param keySerde the Serde used for serializing the key. If {@code null} 
the default key serde from config will be used
+     * @param keySerde
+     *        the Serde used for serializing the key. If {@code null} the 
default key serde from config will be used
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     *
      * @return a new {@link Grouped} configured with the keySerde
+     *
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
      * @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -76,8 +88,14 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
     /**
      * Create a {@link Grouped} instance with the provided valueSerde.  If 
{@code null} the default value serde from config will be used.
      *
-     * @param valueSerde the {@link Serde} used for serializing the value. If 
{@code null} the default value serde from config will be used
+     * @param valueSerde
+     *        the {@link Serde} used for serializing the value. If {@code 
null} the default value serde from config will be used
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     *
      * @return a new {@link Grouped} configured with the valueSerde
+     *
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
      * @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -90,10 +108,18 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
      * Create a {@link Grouped} instance with the provided  name, keySerde, 
and valueSerde. If the keySerde and/or the valueSerde is
      * {@code null} the default value for the respective serde from config 
will be used.
      *
-     * @param name       the name used as part of the repartition topic name 
if required
-     * @param keySerde   the {@link Serde} used for serializing the key. If 
{@code null} the default key serde from config will be used
-     * @param valueSerde the {@link Serde} used for serializing the value. If 
{@code null} the default value serde from config will be used
+     * @param name
+     *        the name used as part of the repartition topic name if required
+     * @param keySerde
+     *        the {@link Serde} used for serializing the key. If {@code null} 
the default key serde from config will be used
+     * @param valueSerde
+     *        the {@link Serde} used for serializing the value. If {@code 
null} the default value serde from config will be used
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     *
      * @return a new {@link Grouped} configured with the name, keySerde, and 
valueSerde
+     *
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
      * @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -109,9 +135,16 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
      * Create a {@link Grouped} instance with the provided keySerde and 
valueSerde.  If the keySerde and/or the valueSerde is
      * {@code null} the default value for the respective serde from config 
will be used.
      *
-     * @param keySerde   the {@link Serde} used for serializing the key. If 
{@code null} the default key serde from config will be used
-     * @param valueSerde the {@link Serde} used for serializing the value. If 
{@code null} the default value serde from config will be used
+     * @param keySerde
+     *         the {@link Serde} used for serializing the key. If {@code null} 
the default key serde from config will be used
+     * @param valueSerde
+     *        the {@link Serde} used for serializing the value. If {@code 
null} the default value serde from config will be used
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     *
      * @return a new {@link Grouped} configured with the keySerde, and 
valueSerde
+     *
      * @see KStream#groupByKey(Grouped)
      * @see KStream#groupBy(KeyValueMapper, Grouped)
      * @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -125,7 +158,9 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
      * Perform the grouping operation with the name for a repartition topic if 
required.  Note
      * that Kafka Streams does not always create repartition topics for 
grouping operations.
      *
-     * @param name the name used for the processor name and as part of the 
repartition topic name if required
+     * @param name
+     *        the name used for the processor name and as part of the 
repartition topic name if required
+     *
      * @return a new {@link Grouped} instance configured with the name
      * */
     @Override
@@ -136,7 +171,9 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
     /**
      * Perform the grouping operation using the provided keySerde for 
serializing the key.
      *
-     * @param keySerde {@link Serde} to use for serializing the key. If {@code 
null} the default key serde from config will be used
+     * @param keySerde
+     *        {@link Serde} to use for serializing the key. If {@code null} 
the default key serde from config will be used
+     *
      * @return a new {@link Grouped} instance configured with the keySerde
      */
     public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
@@ -146,7 +183,9 @@ public class Grouped<K, V> implements 
NamedOperation<Grouped<K, V>> {
     /**
      * Perform the grouping operation using the provided valueSerde for 
serializing the value.
      *
-     * @param valueSerde {@link Serde} to use for serializing the value. If 
{@code null} the default value serde from config will be used
+     * @param valueSerde
+     *        {@link Serde} to use for serializing the value. If {@code null} 
the default value serde from config will be used
+     *
      * @return a new {@link Grouped} instance configured with the valueSerde
      */
     public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 1b59c6409c1..9cd03fcf9e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 /**
  * The {@code Initializer} interface for creating an initial value in 
aggregations.
  * {@code Initializer} is used in combination with {@link Aggregator}.
  *
- * @param <VA> aggregate value type
+ * @param <VAgg> aggregate value type
+ *
  * @see Aggregator
  * @see KGroupedStream#aggregate(Initializer, Aggregator)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@@ -30,12 +30,12 @@ package org.apache.kafka.streams.kstream;
  * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
  * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, 
Materialized)
  */
-public interface Initializer<VA> {
+public interface Initializer<VAgg> {
 
     /**
      * Return the initial value for an aggregation.
      *
      * @return the initial value for an aggregation
      */
-    VA apply();
+    VAgg apply();
 }


Reply via email to