This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 649e2cbc8f3 MINOR: Fix `Consumed` to return new object instead of
`this` (#14550)
649e2cbc8f3 is described below
commit 649e2cbc8f3b13c39b2ed559f3f9e4a15013b64e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Oct 15 19:28:54 2023 -0700
MINOR: Fix `Consumed` to return new object instead of `this` (#14550)
We embrace immutability and thus should return a new object instead of
`this`, similar to other config classed we use in the DSL.
Side JavaDocs cleanup for a bunch of classes.
Reviewers: Guozhang Wang <[email protected]>
---
.../apache/kafka/streams/kstream/Aggregator.java | 19 ++--
.../org/apache/kafka/streams/kstream/Branched.java | 107 ++++++++++++---------
.../kafka/streams/kstream/BranchedKStream.java | 38 +++++---
.../kafka/streams/kstream/CogroupedKStream.java | 84 +++++++++-------
.../org/apache/kafka/streams/kstream/Consumed.java | 107 ++++++++++++---------
.../apache/kafka/streams/kstream/EmitStrategy.java | 7 +-
.../kafka/streams/kstream/ForeachAction.java | 7 +-
.../apache/kafka/streams/kstream/GlobalKTable.java | 1 +
.../org/apache/kafka/streams/kstream/Grouped.java | 61 +++++++++---
.../apache/kafka/streams/kstream/Initializer.java | 8 +-
10 files changed, 275 insertions(+), 164 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 217a145e620..e02146fff1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -24,10 +24,11 @@ package org.apache.kafka.streams.kstream;
* {@code Aggregator} is used in combination with {@link Initializer} that
provides an initial aggregation value.
* <p>
* {@code Aggregator} can be used to implement aggregation functions like
count.
-
+ *
* @param <K> key type
* @param <V> input value type
- * @param <VA> aggregate value type
+ * @param <VAgg> aggregate value type
+ *
* @see Initializer
* @see KGroupedStream#aggregate(Initializer, Aggregator)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@@ -37,15 +38,19 @@ package org.apache.kafka.streams.kstream;
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger,
Materialized)
* @see Reducer
*/
-public interface Aggregator<K, V, VA> {
+public interface Aggregator<K, V, VAgg> {
/**
* Compute a new aggregate from the key and value of a record and the
current aggregate of the same key.
*
- * @param key the key of the record
- * @param value the value of the record
- * @param aggregate the current aggregate value
+ * @param key
+ * the key of the record
+ * @param value
+ * the value of the record
+ * @param aggregate
+ * the current aggregate value
+ *
* @return the new aggregate value
*/
- VA apply(final K key, final V value, final VA aggregate);
+ VAgg apply(final K key, final V value, final VAgg aggregate);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
index 3e85a73c346..2f55f80899a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
@@ -41,12 +41,19 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
this.chainConsumer = chainConsumer;
}
+ protected Branched(final Branched<K, V> branched) {
+ this(branched.name, branched.chainFunction, branched.chainConsumer);
+ }
+
/**
* Create an instance of {@code Branched} with provided branch name suffix.
*
- * @param name the branch name suffix to be used (see {@link
BranchedKStream} description for details)
- * @param <K> key type
- * @param <V> value type
+ * @param name
+ * the branch name suffix to be used (see {@link BranchedKStream}
description for details)
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> as(final String name) {
@@ -57,16 +64,20 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain function.
*
- * @param chain A function that will be applied to the branch. If the
provided function returns
- * {@code null}, its result is ignored, otherwise it is added
to the {@code Map} returned
- * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
- * {@link BranchedKStream} description for details).
- * @param <K> key type
- * @param <V> value type
+ * @param chain
+ * A function that will be applied to the branch. If the provided
function returns
+ * {@code null}, its result is ignored, otherwise it is added to
the {@code Map} returned
+ * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
+ * {@link BranchedKStream} description for details).
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
- final Function<? super KStream<K, V>, ? extends KStream<K, V>>
chain) {
+ final Function<? super KStream<K, V>, ? extends KStream<K, V>>
chain
+ ) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(null, chain, null);
}
@@ -74,12 +85,15 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain consumer.
*
- * @param chain A consumer to which the branch will be sent. If a consumer
is provided,
- * the respective branch will not be added to the resulting
{@code Map} returned
- * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
- * {@link BranchedKStream} description for details).
- * @param <K> key type
- * @param <V> value type
+ * @param chain
+ * A consumer to which the branch will be sent. If a consumer is
provided,
+ * the respective branch will not be added to the resulting {@code
Map} returned
+ * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
+ * {@link BranchedKStream} description for details).
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<KStream<K,
V>> chain) {
@@ -90,18 +104,24 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain function and
branch name suffix.
*
- * @param chain A function that will be applied to the branch. If the
provided function returns
- * {@code null}, its result is ignored, otherwise it is added
to the {@code Map} returned
- * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
- * {@link BranchedKStream} description for details).
- * @param name the branch name suffix to be used. If {@code null}, a
default branch name suffix will be generated
- * (see {@link BranchedKStream} description for details)
- * @param <K> key type
- * @param <V> value type
+ * @param chain
+ * A function that will be applied to the branch. If the provided
function returns
+ * {@code null}, its result is ignored, otherwise it is added to
the {@code Map} returned
+ * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
+ * {@link BranchedKStream} description for details).
+ * @param name
+ * the branch name suffix to be used. If {@code null}, a default
branch name suffix will be generated
+ * (see {@link BranchedKStream} description for details)
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
- final Function<? super KStream<K, V>, ? extends KStream<K, V>>
chain, final String name) {
+ final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain,
+ final String name
+ ) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(name, chain, null);
}
@@ -109,14 +129,18 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain consumer and
branch name suffix.
*
- * @param chain A consumer to which the branch will be sent. If a non-null
consumer is provided,
- * the respective branch will not be added to the resulting
{@code Map} returned
- * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
- * {@link BranchedKStream} description for details).
- * @param name the branch name suffix to be used. If {@code null}, a
default branch name suffix will be generated
- * (see {@link BranchedKStream} description for details)
- * @param <K> key type
- * @param <V> value type
+ * @param chain
+ * A consumer to which the branch will be sent. If a non-null
consumer is provided,
+ * the respective branch will not be added to the resulting {@code
Map} returned
+ * by {@link BranchedKStream#defaultBranch()} or {@link
BranchedKStream#noDefaultBranch()} (see
+ * {@link BranchedKStream} description for details).
+ * @param name
+ * the branch name suffix to be used. If {@code null}, a default
branch name suffix will be generated
+ * (see {@link BranchedKStream} description for details)
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<? super
KStream<K, V>> chain,
@@ -125,21 +149,14 @@ public class Branched<K, V> implements
NamedOperation<Branched<K, V>> {
return new Branched<>(name, null, chain);
}
- /**
- * Create an instance of {@code Branched} from an existing instance.
- *
- * @param branched the instance of {@code Branched} to copy
- */
- protected Branched(final Branched<K, V> branched) {
- this(branched.name, branched.chainFunction, branched.chainConsumer);
- }
-
/**
* Configure the instance of {@code Branched} with a branch name suffix.
*
- * @param name the branch name suffix to be used. If {@code null} a
default branch name suffix will be generated (see
- * {@link BranchedKStream} description for details)
- * @return {@code this}
+ * @param name
+ * the branch name suffix to be used. If {@code null} a default
branch name suffix will be generated (see
+ * {@link BranchedKStream} description for details)
+ *
+ * @return {@code this} to facilitate method chaining
*/
@Override
public Branched<K, V> withName(final String name) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
index 2115170cc94..43475a4227c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
@@ -78,7 +78,7 @@ import java.util.Map;
*
* <h3>Direct Branch Consuming</h3>
* In many cases we do not need to have a single scope for all the branches,
each branch being processed completely
- * independently from others. Then we can use 'consuming' lambdas or method
references in {@link Branched} parameter:
+ * independently of others. Then we can use 'consuming' lambdas or method
references in {@link Branched} parameter:
*
* <pre> {@code
* source.split()
@@ -112,16 +112,19 @@ import java.util.Map;
*
* @param <K> Type of keys
* @param <V> Type of values
+ *
* @see KStream
*/
public interface BranchedKStream<K, V> {
/**
* Define a branch for records that match the predicate.
*
- * @param predicate A {@link Predicate} instance, against which each
record will be evaluated.
- * If this predicate returns {@code true} for a given
record, the record will be
- * routed to the current branch and will not be evaluated
against the predicates
- * for the remaining branches.
+ * @param predicate
+ * A {@link Predicate} instance, against which each record will be
evaluated.
+ * If this predicate returns {@code true} for a given record, the
record will be
+ * routed to the current branch and will not be evaluated against
the predicates
+ * for the remaining branches.
+ *
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
@@ -129,13 +132,16 @@ public interface BranchedKStream<K, V> {
/**
* Define a branch for records that match the predicate.
*
- * @param predicate A {@link Predicate} instance, against which each
record will be evaluated.
- * If this predicate returns {@code true} for a given
record, the record will be
- * routed to the current branch and will not be evaluated
against the predicates
- * for the remaining branches.
- * @param branched A {@link Branched} parameter, that allows to define a
branch name, an in-place
- * branch consumer or branch mapper (see <a
href="#examples">code examples</a>
- * for {@link BranchedKStream})
+ * @param predicate
+ * A {@link Predicate} instance, against which each record will be
evaluated.
+ * If this predicate returns {@code true} for a given record, the
record will be
+ * routed to the current branch and will not be evaluated against
the predicates
+ * for the remaining branches.
+ * @param branched
+ * A {@link Branched} parameter, that allows to define a branch
name, an in-place
+ * branch consumer or branch mapper (see <a href="#examples">code
examples</a>
+ * for {@link BranchedKStream})
+ *
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate,
Branched<K, V> branched);
@@ -153,9 +159,11 @@ public interface BranchedKStream<K, V> {
* Finalize the construction of branches and defines the default branch
for the messages not intercepted
* by other branches. Calling {@code defaultBranch} or {@link
#noDefaultBranch()} is optional.
*
- * @param branched A {@link Branched} parameter, that allows to define a
branch name, an in-place
- * branch consumer or branch mapper (see <a
href="#examples">code examples</a>
- * for {@link BranchedKStream})
+ * @param branched
+ * A {@link Branched} parameter, that allows to define a branch
name, an in-place
+ * branch consumer or branch mapper (see <a href="#examples">code
examples</a>
+ * for {@link BranchedKStream})
+ *
* @return {@link Map} of named branches. For rules of forming the
resulting map, see {@link BranchedKStream}
* <a href="#maprules">description</a>.
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index ccb003ea755..f4c975848b2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -36,9 +36,9 @@ import
org.apache.kafka.streams.state.TimestampedKeyValueStore;
* {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
*
* @param <K> Type of keys
- * @param <VOut> Type of values after agg
+ * @param <VAgg> Type of values after agg
*/
-public interface CogroupedKStream<K, VOut> {
+public interface CogroupedKStream<K, VAgg> {
/**
* Add an already {@link KGroupedStream grouped KStream} to this {@code
CogroupedKStream}.
@@ -54,13 +54,17 @@ public interface CogroupedKStream<K, VOut> {
* using the initial intermediate aggregation result provided via the
{@link Initializer} that is passed into
* {@link #aggregate(Initializer)}) and the record's value.
*
- * @param groupedStream a group stream
- * @param aggregator an {@link Aggregator} that computes a new
aggregate result
- * @param <VIn> Type of input values
+ * @param groupedStream
+ * a group stream
+ * @param aggregator
+ * an {@link Aggregator} that computes a new aggregate result
+ *
+ * @param <V> Type of input values
+ *
* @return a {@code CogroupedKStream}
*/
- <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn>
groupedStream,
- final Aggregator<? super K, ?
super VIn, VOut> aggregator);
+ <V> CogroupedKStream<K, VAgg> cogroup(final KGroupedStream<K, V>
groupedStream,
+ final Aggregator<? super K, ? super
V, VAgg> aggregator);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -105,12 +109,14 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link
Topology#describe()}.
*
- * @param initializer an {@link Initializer} that computes an initial
intermediate aggregation
- * result. Cannot be {@code null}.
+ * @param initializer
+ * an {@link Initializer} that computes an initial intermediate
aggregation
+ * result. Cannot be {@code null}.
+ *
* @return a {@link KTable} that contains "update" records with unmodified
keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
+ KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -156,13 +162,15 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link
Topology#describe()}.
*
- * @param initializer an {@link Initializer} that computes an initial
intermediate aggregation
- * result. Cannot be {@code null}.
- * @param named name the processor. Cannot be {@code null}.
+ * @param initializer
+ * an {@link Initializer} that computes an initial intermediate
aggregation result. Cannot be {@code null}.
+ * @param named
+ * name the processor. Cannot be {@code null}.
+ *
* @return a {@link KTable} that contains "update" records with unmodified
keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
final Named named);
/**
@@ -208,15 +216,16 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link
Topology#describe()}.
*
- * @param initializer an {@link Initializer} that computes an initial
intermediate aggregation
- * result. Cannot be {@code null}.
- * @param materialized an instance of {@link Materialized} used to
materialize a state store.
- * Cannot be {@code null}.
+ * @param initializer
+ * an {@link Initializer} that computes an initial intermediate
aggregation result. Cannot be {@code null}.
+ * @param materialized
+ * an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
+ *
* @return a {@link KTable} that contains "update" records with unmodified
keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
- final Materialized<K, VOut, KeyValueStore<Bytes,
byte[]>> materialized);
+ KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+ final Materialized<K, VAgg, KeyValueStore<Bytes,
byte[]>> materialized);
/**
* Aggregate the values of records in these streams by the grouped key.
@@ -262,44 +271,53 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link
Topology#describe()}.
*
- * @param initializer an {@link Initializer} that computes an initial
intermediate aggregation
- * result. Cannot be {@code null}.
- * @param materialized an instance of {@link Materialized} used to
materialize a state store.
- * Cannot be {@code null}.
- * @param named name the processors. Cannot be {@code null}.
+ * @param initializer
+ * an {@link Initializer} that computes an initial intermediate
aggregation result. Cannot be {@code null}.
+ * @param materialized
+ * an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
+ * @param named
+ * name the processors. Cannot be {@code null}.
+ *
* @return a {@link KTable} that contains "update" records with unmodified
keys, and values that
* represent the latest (rolling) aggregate for each key
*/
- KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
final Named named,
- final Materialized<K, VOut, KeyValueStore<Bytes,
byte[]>> materialized);
+ final Materialized<K, VAgg, KeyValueStore<Bytes,
byte[]>> materialized);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be
used to perform windowed
* aggregations.
*
- * @param windows the specification of the aggregation {@link Windows}
+ * @param windows
+ * the specification of the aggregation {@link Windows}
+ *
* @param <W> the window type
+ *
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
- <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final
Windows<W> windows);
+ <W extends Window> TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final
Windows<W> windows);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be
used to perform sliding
* windowed aggregations.
*
- * @param windows the specification of the aggregation {@link
SlidingWindows}
+ * @param windows
+ * the specification of the aggregation {@link SlidingWindows}
+ *
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
- TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows
windows);
+ TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final SlidingWindows
windows);
/**
* Create a new {@link SessionWindowedCogroupedKStream} instance that can
be used to perform session
* windowed aggregations.
*
- * @param windows the specification of the aggregation {@link
SessionWindows}
+ * @param windows
+ * the specification of the aggregation {@link SessionWindows}
+ *
* @return an instance of {@link SessionWindowedCogroupedKStream}
*/
- SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows
windows);
+ SessionWindowedCogroupedKStream<K, VAgg> windowedBy(final SessionWindows
windows);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 423ca6022d8..d1713ab20a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -70,10 +70,6 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
this.processorName = processorName;
}
- /**
- * Create an instance of {@link Consumed} from an existing instance.
- * @param consumed the instance of {@link Consumed} to copy
- */
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde,
consumed.valueSerde,
@@ -86,12 +82,18 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with the supplied arguments.
{@code null} values are acceptable.
*
- * @param keySerde the key serde. If {@code null} the default
key serde from config will be used
- * @param valueSerde the value serde. If {@code null} the default
value serde from config will be used
- * @param timestampExtractor the timestamp extractor to used. If {@code
null} the default timestamp extractor from config will be used
- * @param resetPolicy the offset reset policy to be used. If {@code
null} the default reset policy from config will be used
- * @param <K> key type
- * @param <V> value type
+ * @param keySerde
+ * the key serde. If {@code null} the default key serde from config
will be used
+ * @param valueSerde
+ * the value serde. If {@code null} the default value serde from
config will be used
+ * @param timestampExtractor
+ * the timestamp extractor to used. If {@code null} the default
timestamp extractor from config will be used
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@@ -99,16 +101,19 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
final TimestampExtractor
timestampExtractor,
final Topology.AutoOffsetReset
resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor,
resetPolicy, null);
-
}
/**
* Create an instance of {@link Consumed} with key and value {@link
Serde}s.
*
- * @param keySerde the key serde. If {@code null} the default key serde
from config will be used
- * @param valueSerde the value serde. If {@code null} the default value
serde from config will be used
- * @param <K> key type
- * @param <V> value type
+ * @param keySerde
+ * the key serde. If {@code null} the default key serde from config
will be used
+ * @param valueSerde
+ * the value serde. If {@code null} the default value serde from
config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@@ -119,9 +124,12 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with a {@link
TimestampExtractor}.
*
- * @param timestampExtractor the timestamp extractor to used. If {@code
null} the default timestamp extractor from config will be used
- * @param <K> key type
- * @param <V> value type
+ * @param timestampExtractor
+ * the timestamp extractor to used. If {@code null} the default
timestamp extractor from config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor
timestampExtractor) {
@@ -131,9 +139,12 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with a {@link
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
- * @param resetPolicy the offset reset policy to be used. If {@code null}
the default reset policy from config will be used
- * @param <K> key type
- * @param <V> value type
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset
resetPolicy) {
@@ -143,9 +154,12 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with provided processor name.
*
- * @param processorName the processor name to be used. If {@code null} a
default processor name will be generated
- * @param <K> key type
- * @param <V> value type
+ * @param processorName
+ * the processor name to be used. If {@code null} a default
processor name will be generated
+ *
+ * @param <K> key type
+ * @param <V> value type
+ *
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
@@ -155,57 +169,62 @@ public class Consumed<K, V> implements
NamedOperation<Consumed<K, V>> {
/**
* Configure the instance of {@link Consumed} with a key {@link Serde}.
*
- * @param keySerde the key serde. If {@code null}the default key serde
from config will be used
- * @return this
+ * @param keySerde
+ * the key serde. If {@code null} the default key serde from config
will be used
+ *
+ * @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
- this.keySerde = keySerde;
- return this;
+ return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a value {@link Serde}.
*
- * @param valueSerde the value serde. If {@code null} the default value
serde from config will be used
- * @return this
+ * @param valueSerde
+ * the value serde. If {@code null} the default value serde from
config will be used
+ *
+ * @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
- this.valueSerde = valueSerde;
- return this;
+ return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a {@link
TimestampExtractor}.
*
- * @param timestampExtractor the timestamp extractor to used. If {@code
null} the default timestamp extractor from config will be used
- * @return this
+ * @param timestampExtractor
+ * the timestamp extractor to used. If {@code null} the default
timestamp extractor from config will be used
+ *
+ * @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor
timestampExtractor) {
- this.timestampExtractor = timestampExtractor;
- return this;
+ return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a {@link
org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
- * @param resetPolicy the offset reset policy to be used. If {@code null}
the default reset policy from config will be used
- * @return this
+ * @param resetPolicy
+ * the offset reset policy to be used. If {@code null} the default
reset policy from config will be used
+ *
+ * @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset
resetPolicy) {
- this.resetPolicy = resetPolicy;
- return this;
+ return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a processor name.
*
- * @param processorName the processor name to be used. If {@code null} a
default processor name will be generated
- * @return this
+ * @param processorName
+ * the processor name to be used. If {@code null} a default
processor name will be generated
+ *
+ * @return a new instance of {@link Consumed}
*/
@Override
public Consumed<K, V> withName(final String processorName) {
- this.processorName = processorName;
- return this;
+ return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor,
resetPolicy, processorName);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
index 365a19c1426..bacb4352b93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
@@ -68,7 +68,8 @@ public interface EmitStrategy {
}
/**
- * Returns the strategy type
+ * Returns the strategy type.
+ *
* @return Emit strategy type
*/
StrategyType type();
@@ -87,7 +88,7 @@ public interface EmitStrategy {
* @see UnlimitedWindows
* @see WindowUpdateStrategy
*
- * @return WindowCloseStrategy instance
+ * @return "window close" {@code EmitStrategy} instance
*/
static EmitStrategy onWindowClose() {
return new WindowCloseStrategy();
@@ -103,7 +104,7 @@ public interface EmitStrategy {
* @see UnlimitedWindows
* @see WindowCloseStrategy
*
- * @return WindowCloseStrategy instance
+ * @return "window update" {@code EmitStrategy} instance
*/
static EmitStrategy onWindowUpdate() {
return new WindowUpdateStrategy();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index f20d77ccb1c..afdc972985c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -27,6 +27,7 @@ package org.apache.kafka.streams.kstream;
*
* @param <K> key type
* @param <V> value type
+ *
* @see KStream#foreach(ForeachAction)
*/
public interface ForeachAction<K, V> {
@@ -34,8 +35,10 @@ public interface ForeachAction<K, V> {
/**
* Perform an action for each record of a stream.
*
- * @param key the key of the record
- * @param value the value of the record
+ * @param key
+ * the key of the record
+ * @param value
+ * the value of the record
*/
void apply(final K key, final V value);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 73efbc3a512..2f4c380012e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -59,6 +59,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
+ *
* @see KTable
* @see StreamsBuilder#globalTable(String)
* @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
index 44b07408201..631969734bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -48,8 +48,14 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided name used as part
of the repartition topic if required.
*
- * @param name the name used for a repartition topic if required
+ * @param name
+ * the name used for a repartition topic if required
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ *
* @return a new {@link Grouped} configured with the name
+ *
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -62,8 +68,14 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided keySerde. If {@code
null} the default key serde from config will be used.
*
- * @param keySerde the Serde used for serializing the key. If {@code null}
the default key serde from config will be used
+ * @param keySerde
+ * the Serde used for serializing the key. If {@code null} the
default key serde from config will be used
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ *
* @return a new {@link Grouped} configured with the keySerde
+ *
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -76,8 +88,14 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided valueSerde. If
{@code null} the default value serde from config will be used.
*
- * @param valueSerde the {@link Serde} used for serializing the value. If
{@code null} the default value serde from config will be used
+ * @param valueSerde
+ * the {@link Serde} used for serializing the value. If {@code
null} the default value serde from config will be used
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ *
* @return a new {@link Grouped} configured with the valueSerde
+ *
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -90,10 +108,18 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
* Create a {@link Grouped} instance with the provided name, keySerde,
and valueSerde. If the keySerde and/or the valueSerde is
* {@code null} the default value for the respective serde from config
will be used.
*
- * @param name the name used as part of the repartition topic name
if required
- * @param keySerde the {@link Serde} used for serializing the key. If
{@code null} the default key serde from config will be used
- * @param valueSerde the {@link Serde} used for serializing the value. If
{@code null} the default value serde from config will be used
+ * @param name
+ * the name used as part of the repartition topic name if required
+ * @param keySerde
+ * the {@link Serde} used for serializing the key. If {@code null}
the default key serde from config will be used
+ * @param valueSerde
+ * the {@link Serde} used for serializing the value. If {@code
null} the default value serde from config will be used
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ *
* @return a new {@link Grouped} configured with the name, keySerde, and
valueSerde
+ *
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -109,9 +135,16 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
* Create a {@link Grouped} instance with the provided keySerde and
valueSerde. If the keySerde and/or the valueSerde is
* {@code null} the default value for the respective serde from config
will be used.
*
- * @param keySerde the {@link Serde} used for serializing the key. If
{@code null} the default key serde from config will be used
- * @param valueSerde the {@link Serde} used for serializing the value. If
{@code null} the default value serde from config will be used
+ * @param keySerde
+ * the {@link Serde} used for serializing the key. If {@code null}
the default key serde from config will be used
+ * @param valueSerde
+ * the {@link Serde} used for serializing the value. If {@code
null} the default value serde from config will be used
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ *
* @return a new {@link Grouped} configured with the keySerde, and
valueSerde
+ *
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@@ -125,7 +158,9 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
* Perform the grouping operation with the name for a repartition topic if
required. Note
* that Kafka Streams does not always create repartition topics for
grouping operations.
*
- * @param name the name used for the processor name and as part of the
repartition topic name if required
+ * @param name
+ * the name used for the processor name and as part of the
repartition topic name if required
+ *
* @return a new {@link Grouped} instance configured with the name
* */
@Override
@@ -136,7 +171,9 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
/**
* Perform the grouping operation using the provided keySerde for
serializing the key.
*
- * @param keySerde {@link Serde} to use for serializing the key. If {@code
null} the default key serde from config will be used
+ * @param keySerde
+ * {@link Serde} to use for serializing the key. If {@code null}
the default key serde from config will be used
+ *
* @return a new {@link Grouped} instance configured with the keySerde
*/
public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
@@ -146,7 +183,9 @@ public class Grouped<K, V> implements
NamedOperation<Grouped<K, V>> {
/**
* Perform the grouping operation using the provided valueSerde for
serializing the value.
*
- * @param valueSerde {@link Serde} to use for serializing the value. If
{@code null} the default value serde from config will be used
+ * @param valueSerde
+ * {@link Serde} to use for serializing the value. If {@code null}
the default value serde from config will be used
+ *
* @return a new {@link Grouped} instance configured with the valueSerde
*/
public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 1b59c6409c1..9cd03fcf9e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.kstream;
-
/**
* The {@code Initializer} interface for creating an initial value in
aggregations.
* {@code Initializer} is used in combination with {@link Aggregator}.
*
- * @param <VA> aggregate value type
+ * @param <VAgg> aggregate value type
+ *
* @see Aggregator
* @see KGroupedStream#aggregate(Initializer, Aggregator)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@@ -30,12 +30,12 @@ package org.apache.kafka.streams.kstream;
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger,
Materialized)
*/
-public interface Initializer<VA> {
+public interface Initializer<VAgg> {
/**
* Return the initial value for an aggregation.
*
* @return the initial value for an aggregation
*/
- VA apply();
+ VAgg apply();
}