This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a916a1db821 MINOR: cleanup KStream JavaDocs (1/N) -
filter[Not]/selectKey (#18703)
a916a1db821 is described below
commit a916a1db82153a86f2319a936b9daa239c2aa438
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Jan 30 09:31:47 2025 -0800
MINOR: cleanup KStream JavaDocs (1/N) - filter[Not]/selectKey (#18703)
Reviewers: Alieh Saeedi <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../org/apache/kafka/streams/kstream/KStream.java | 257 ++++++++++-----------
.../streams/kstream/internals/KStreamImpl.java | 68 +++---
2 files changed, 150 insertions(+), 175 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 13d17aa6796..65d26fd8b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -62,23 +62,22 @@ public interface KStream<K, V> {
/**
* Create a new {@code KStream} that consists of all records of this
stream which satisfy the given predicate.
* All records that do not satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful record processing).
+ *
+ * @param predicate
+ * a filter {@link Predicate} that is applied to each record
+ *
+ * @return A {@code KStream} that contains only those records that satisfy
the given predicate.
*
- * @param predicate a filter {@link Predicate} that is applied to each
record
- * @return a {@code KStream} that contains only those records that satisfy
the given predicate
* @see #filterNot(Predicate)
*/
KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
- * Create a new {@code KStream} that consists of all records of this
stream which satisfy the given predicate.
- * All records that do not satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * See {@link #filter(Predicate)}.
*
- * @param predicate a filter {@link Predicate} that is applied to each
record
- * @param named a {@link Named} config used to name the processor in
the topology
- * @return a {@code KStream} that contains only those records that satisfy
the given predicate
- * @see #filterNot(Predicate)
+ * <p>Takes an additional {@link Named} parameter that is used to name the
processor in the topology.
*/
KStream<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Named named);
@@ -86,68 +85,36 @@ public interface KStream<K, V> {
* Create a new {@code KStream} that consists all records of this stream
which do <em>not</em> satisfy the given
* predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful record processing).
+ *
+ * @param predicate
+ * a filter {@link Predicate} that is applied to each record
+ *
+ * @return A {@code KStream} that contains only those records that do
<em>not</em> satisfy the given predicate.
*
- * @param predicate a filter {@link Predicate} that is applied to each
record
- * @return a {@code KStream} that contains only those records that do
<em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
- * Create a new {@code KStream} that consists all records of this stream
which do <em>not</em> satisfy the given
- * predicate.
- * All records that <em>do</em> satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * See {@link #filterNot(Predicate)}.
*
- * @param predicate a filter {@link Predicate} that is applied to each
record
- * @param named a {@link Named} config used to name the processor in
the topology
- * @return a {@code KStream} that contains only those records that do
<em>not</em> satisfy the given predicate
- * @see #filter(Predicate)
+ * <p>Takes an additional {@link Named} parameter that is used to name the
processor in the topology.
*/
KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named);
/**
- * Set a new key (with possibly new type) for each input record.
- * The provided {@link KeyValueMapper} is applied to each input record and
computes a new key for it.
+ * Create a new {@code KStream} that consists of all records of this
stream but with a modified key.
+ * The provided {@link KeyValueMapper} is applied to each input record and
computes a new key (possibly of a
+ * different type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V>}.
- * This is a stateless record-by-record operation.
- * <p>
- * For example, you can use this transformation to set a key for a
key-less input record {@code <null,V>} by
- * extracting a key from the value within your {@link KeyValueMapper}. The
example below computes the new key as the
- * length of the value string.
- * <pre>{@code
- * KStream<Byte[], String> keyLessStream =
builder.stream("key-less-topic");
- * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new
KeyValueMapper<Byte[], String, Integer> {
- * Integer apply(Byte[] key, String value) {
- * return value.length();
- * }
- * });
- * }</pre>
- * Setting a new key might result in an internal data redistribution if a
key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}.
+ * This is a stateless record-by-record operation (cf. {@link
#process(ProcessorSupplier, String...)} for
+ * stateful record processing).
*
- * @param mapper a {@link KeyValueMapper} that computes a new key for each
record
- * @param <KR> the new key type of the result stream
- * @return a {@code KStream} that contains records with new key (possibly
of different type) and unmodified value
- * @see #map(KeyValueMapper)
- * @see #flatMap(KeyValueMapper)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @see #flatMapValues(ValueMapper)
- * @see #flatMapValues(ValueMapperWithKey)
- */
- <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ?
extends KR> mapper);
-
- /**
- * Set a new key (with possibly new type) for each input record.
- * The provided {@link KeyValueMapper} is applied to each input record and
computes a new key for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V>}.
- * This is a stateless record-by-record operation.
- * <p>
- * For example, you can use this transformation to set a key for a
key-less input record {@code <null,V>} by
- * extracting a key from the value within your {@link KeyValueMapper}. The
example below computes the new key as the
- * length of the value string.
+ * <p>For example, you can use this transformation to set a key for a
key-less input record {@code <null,V>}
+ * by extracting a key from the value within your {@link KeyValueMapper}.
The example below computes the new key
+ * as the length of the value string.
* <pre>{@code
* KStream<Byte[], String> keyLessStream =
builder.stream("key-less-topic");
* KStream<Integer, String> keyedStream = keyLessStream.selectKey(new
KeyValueMapper<Byte[], String, Integer> {
@@ -156,98 +123,30 @@ public interface KStream<K, V> {
* }
* });
* }</pre>
- * Setting a new key might result in an internal data redistribution if a
key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}.
+ * Setting a new key might result in an internal data redistribution if a
key based operator (like an aggregation
+ * or join) is applied to the result {@code KStream}.
+ *
+ * @param mapper
+ * a {@link KeyValueMapper} that computes a new key for each input
record
+ *
+ * @param <KOut> the new key type of the result {@code KStream}
+ *
+ * @return A {@code KStream} that contains records with new key (possibly
of a different type) and unmodified value.
*
- * @param mapper a {@link KeyValueMapper} that computes a new key for each
record
- * @param named a {@link Named} config used to name the processor in the
topology
- * @param <KR> the new key type of the result stream
- * @return a {@code KStream} that contains records with new key (possibly
of different type) and unmodified value
* @see #map(KeyValueMapper)
- * @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @see #flatMapValues(ValueMapper)
- * @see #flatMapValues(ValueMapperWithKey)
- */
- <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ?
extends KR> mapper,
- final Named named);
-
- /**
- * Transform each record of the input stream into a new record in the
output stream (both key and value type can be
- * altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and
computes a new output record.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V'>}.
- * This is a stateless record-by-record operation (cf. {@link
#process(ProcessorSupplier, String...)} for
- * stateful record processing).
- * <p>
- * The example below normalizes the String key to upper-case letters and
counts the number of token of the value string.
- * <pre>{@code
- * KStream<String, String> inputStream = builder.stream("topic");
- * KStream<String, Integer> outputStream = inputStream.map(new
KeyValueMapper<String, String, KeyValue<String, Integer>> {
- * KeyValue<String, Integer> apply(String key, String value) {
- * return new KeyValue<>(key.toUpperCase(), value.split("
").length);
- * }
- * });
- * }</pre>
- * The provided {@link KeyValueMapper} must return a {@link KeyValue} type
and must not return {@code null}.
- * <p>
- * Mapping records might result in an internal data redistribution if a
key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}. (cf. {@link
#mapValues(ValueMapper)})
- *
- * @param mapper a {@link KeyValueMapper} that computes a new output record
- * @param <KR> the key type of the result stream
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with new key and value
(possibly both of different type)
- * @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
- * @see #flatMapValues(ValueMapperWithKey)
- * @see #process(ProcessorSupplier, String...)
- * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
- <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper);
+ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super
V, ? extends KOut> mapper);
/**
- * Transform each record of the input stream into a new record in the
output stream (both key and value type can be
- * altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and
computes a new output record.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V'>}.
- * This is a stateless record-by-record operation (cf. {@link
#process(ProcessorSupplier, String...)} for
- * stateful record processing).
- * <p>
- * The example below normalizes the String key to upper-case letters and
counts the number of token of the value string.
- * <pre>{@code
- * KStream<String, String> inputStream = builder.stream("topic");
- * KStream<String, Integer> outputStream = inputStream.map(new
KeyValueMapper<String, String, KeyValue<String, Integer>> {
- * KeyValue<String, Integer> apply(String key, String value) {
- * return new KeyValue<>(key.toUpperCase(), value.split("
").length);
- * }
- * });
- * }</pre>
- * The provided {@link KeyValueMapper} must return a {@link KeyValue} type
and must not return {@code null}.
- * <p>
- * Mapping records might result in an internal data redistribution if a
key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}. (cf. {@link
#mapValues(ValueMapper)})
+ * See {@link #selectKey(KeyValueMapper)}.
*
- * @param mapper a {@link KeyValueMapper} that computes a new output record
- * @param named a {@link Named} config used to name the processor in the
topology
- * @param <KR> the key type of the result stream
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with new key and value
(possibly both of different type)
- * @see #selectKey(KeyValueMapper)
- * @see #flatMap(KeyValueMapper)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @see #flatMapValues(ValueMapper)
- * @see #flatMapValues(ValueMapperWithKey)
- * @see #process(ProcessorSupplier, String...)
- * @see #processValues(FixedKeyProcessorSupplier, String...)
+ * <p>Takes an additional {@link Named} parameter that is used to name the
processor in the topology.
*/
- <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper,
- final Named named);
+ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super
V, ? extends KOut> mapper,
+ final Named named);
/**
* Transform the value of each input record into a new value (with
possible new type) of the output record.
@@ -387,6 +286,82 @@ public interface KStream<K, V> {
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
V, ? extends VR> mapper,
final Named named);
+ /**
+ * Transform each record of the input stream into a new record in the
output stream (both key and value type can be
+ * altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and
computes a new output record.
+ * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V'>}.
+ * This is a stateless record-by-record operation (cf. {@link
#process(ProcessorSupplier, String...)} for
+ * stateful record processing).
+ * <p>
+ * The example below normalizes the String key to upper-case letters and
counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.map(new
KeyValueMapper<String, String, KeyValue<String, Integer>> {
+ * KeyValue<String, Integer> apply(String key, String value) {
+ * return new KeyValue<>(key.toUpperCase(), value.split("
").length);
+ * }
+ * });
+ * }</pre>
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type
and must not return {@code null}.
+ * <p>
+ * Mapping records might result in an internal data redistribution if a
key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}. (cf. {@link
#mapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new output record
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with new key and value
(possibly both of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #process(ProcessorSupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
+ */
+ <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper);
+
+ /**
+ * Transform each record of the input stream into a new record in the
output stream (both key and value type can be
+ * altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and
computes a new output record.
+ * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K':V'>}.
+ * This is a stateless record-by-record operation (cf. {@link
#process(ProcessorSupplier, String...)} for
+ * stateful record processing).
+ * <p>
+ * The example below normalizes the String key to upper-case letters and
counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.map(new
KeyValueMapper<String, String, KeyValue<String, Integer>> {
+ * KeyValue<String, Integer> apply(String key, String value) {
+ * return new KeyValue<>(key.toUpperCase(), value.split("
").length);
+ * }
+ * });
+ * }</pre>
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type
and must not return {@code null}.
+ * <p>
+ * Mapping records might result in an internal data redistribution if a
key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}. (cf. {@link
#mapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new output record
+ * @param named a {@link Named} config used to name the processor in the
topology
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with new key and value
(possibly both of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #process(ProcessorSupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
+ */
+ <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper,
+ final Named named);
+
/**
* Transform each record of the input stream into zero or more records in
the output stream (both key and value type
* can be altered arbitrarily).
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 94e0e9a0e36..da728c3e410 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -200,13 +200,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
}
@Override
- public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ?
super V, ? extends KR> mapper) {
+ public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ?
super V, ? extends KOut> mapper) {
return selectKey(mapper, NamedInternal.empty());
}
@Override
- public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ?
super V, ? extends KR> mapper,
- final Named named) {
+ public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ?
super V, ? extends KOut> mapper,
+ final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");
@@ -236,37 +236,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
return new ProcessorGraphNode<>(name, processorParameters);
}
- @Override
- public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ?
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
- return map(mapper, NamedInternal.empty());
- }
-
- @Override
- public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ?
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
- final Named named) {
- Objects.requireNonNull(mapper, "mapper can't be null");
- Objects.requireNonNull(named, "named can't be null");
-
- final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
- final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
- new ProcessorParameters<>(new KStreamMap<>(mapper), name);
- final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
- new ProcessorGraphNode<>(name, processorParameters);
- mapProcessorNode.keyChangingOperation(true);
-
- builder.addGraphNode(graphNode, mapProcessorNode);
-
- // key and value serde cannot be preserved
- return new KStreamImpl<>(
- name,
- null,
- null,
- subTopologySourceNodes,
- true,
- mapProcessorNode,
- builder);
- }
-
@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ?
extends VR> valueMapper) {
return mapValues(withKey(valueMapper));
@@ -309,6 +278,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
builder);
}
+ @Override
+ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ?
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
+ return map(mapper, NamedInternal.empty());
+ }
+
+ @Override
+ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ?
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
+ final Named named) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+
+ final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
+ final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
+ new ProcessorParameters<>(new KStreamMap<>(mapper), name);
+ final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
+ new ProcessorGraphNode<>(name, processorParameters);
+ mapProcessorNode.keyChangingOperation(true);
+
+ builder.addGraphNode(graphNode, mapProcessorNode);
+
+ // key and value serde cannot be preserved
+ return new KStreamImpl<>(
+ name,
+ null,
+ null,
+ subTopologySourceNodes,
+ true,
+ mapProcessorNode,
+ builder);
+ }
+
@Override
public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ?
super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>>
mapper) {
return flatMap(mapper, NamedInternal.empty());