This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a916a1db821 MINOR: cleanup KStream JavaDocs (1/N) - 
filter[Not]/selectKey (#18703)
a916a1db821 is described below

commit a916a1db82153a86f2319a936b9daa239c2aa438
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Jan 30 09:31:47 2025 -0800

    MINOR: cleanup KStream JavaDocs (1/N) - filter[Not]/selectKey (#18703)
    
    Reviewers: Alieh Saeedi <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 257 ++++++++++-----------
 .../streams/kstream/internals/KStreamImpl.java     |  68 +++---
 2 files changed, 150 insertions(+), 175 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 13d17aa6796..65d26fd8b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -62,23 +62,22 @@ public interface KStream<K, V> {
     /**
      * Create a new {@code KStream} that consists of all records of this 
stream which satisfy the given predicate.
      * All records that do not satisfy the predicate are dropped.
-     * This is a stateless record-by-record operation.
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful record processing).
+     *
+     * @param predicate
+     *        a filter {@link Predicate} that is applied to each record
+     *
+     * @return A {@code KStream} that contains only those records that satisfy 
the given predicate.
      *
-     * @param predicate a filter {@link Predicate} that is applied to each 
record
-     * @return a {@code KStream} that contains only those records that satisfy 
the given predicate
      * @see #filterNot(Predicate)
      */
     KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
 
     /**
-     * Create a new {@code KStream} that consists of all records of this 
stream which satisfy the given predicate.
-     * All records that do not satisfy the predicate are dropped.
-     * This is a stateless record-by-record operation.
+     * See {@link #filter(Predicate)}.
      *
-     * @param predicate a filter {@link Predicate} that is applied to each 
record
-     * @param named     a {@link Named} config used to name the processor in 
the topology
-     * @return a {@code KStream} that contains only those records that satisfy 
the given predicate
-     * @see #filterNot(Predicate)
+     * <p>Takes an additional {@link Named} parameter that is used to name the 
processor in the topology.
      */
     KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, 
final Named named);
 
@@ -86,68 +85,36 @@ public interface KStream<K, V> {
      * Create a new {@code KStream} that consists all records of this stream 
which do <em>not</em> satisfy the given
      * predicate.
      * All records that <em>do</em> satisfy the predicate are dropped.
-     * This is a stateless record-by-record operation.
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful record processing).
+     *
+     * @param predicate
+     *        a filter {@link Predicate} that is applied to each record
+     *
+     * @return A {@code KStream} that contains only those records that do 
<em>not</em> satisfy the given predicate.
      *
-     * @param predicate a filter {@link Predicate} that is applied to each 
record
-     * @return a {@code KStream} that contains only those records that do 
<em>not</em> satisfy the given predicate
      * @see #filter(Predicate)
      */
     KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
 
     /**
-     * Create a new {@code KStream} that consists all records of this stream 
which do <em>not</em> satisfy the given
-     * predicate.
-     * All records that <em>do</em> satisfy the predicate are dropped.
-     * This is a stateless record-by-record operation.
+     * See {@link #filterNot(Predicate)}.
      *
-     * @param predicate a filter {@link Predicate} that is applied to each 
record
-     * @param named     a {@link Named} config used to name the processor in 
the topology
-     * @return a {@code KStream} that contains only those records that do 
<em>not</em> satisfy the given predicate
-     * @see #filter(Predicate)
+     * <p>Takes an additional {@link Named} parameter that is used to name the 
processor in the topology.
      */
     KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, 
final Named named);
 
     /**
-     * Set a new key (with possibly new type) for each input record.
-     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new key for it.
+     * Create a new {@code KStream} that consists of all records of this 
stream but with a modified key.
+     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new key (possibly of a
+     * different type) for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V>}.
-     * This is a stateless record-by-record operation.
-     * <p>
-     * For example, you can use this transformation to set a key for a 
key-less input record {@code <null,V>} by
-     * extracting a key from the value within your {@link KeyValueMapper}. The 
example below computes the new key as the
-     * length of the value string.
-     * <pre>{@code
-     * KStream<Byte[], String> keyLessStream = 
builder.stream("key-less-topic");
-     * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new 
KeyValueMapper<Byte[], String, Integer> {
-     *     Integer apply(Byte[] key, String value) {
-     *         return value.length();
-     *     }
-     * });
-     * }</pre>
-     * Setting a new key might result in an internal data redistribution if a 
key based operator (like an aggregation or
-     * join) is applied to the result {@code KStream}.
+     * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)} for
+     * stateful record processing).
      *
-     * @param mapper a {@link KeyValueMapper} that computes a new key for each 
record
-     * @param <KR>   the new key type of the result stream
-     * @return a {@code KStream} that contains records with new key (possibly 
of different type) and unmodified value
-     * @see #map(KeyValueMapper)
-     * @see #flatMap(KeyValueMapper)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @see #flatMapValues(ValueMapper)
-     * @see #flatMapValues(ValueMapperWithKey)
-     */
-    <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? 
extends KR> mapper);
-
-    /**
-     * Set a new key (with possibly new type) for each input record.
-     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new key for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V>}.
-     * This is a stateless record-by-record operation.
-     * <p>
-     * For example, you can use this transformation to set a key for a 
key-less input record {@code <null,V>} by
-     * extracting a key from the value within your {@link KeyValueMapper}. The 
example below computes the new key as the
-     * length of the value string.
+     * <p>For example, you can use this transformation to set a key for a 
key-less input record {@code <null,V>}
+     * by extracting a key from the value within your {@link KeyValueMapper}. 
The example below computes the new key
+     * as the length of the value string.
      * <pre>{@code
      * KStream<Byte[], String> keyLessStream = 
builder.stream("key-less-topic");
      * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new 
KeyValueMapper<Byte[], String, Integer> {
@@ -156,98 +123,30 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * Setting a new key might result in an internal data redistribution if a 
key based operator (like an aggregation or
-     * join) is applied to the result {@code KStream}.
+     * Setting a new key might result in an internal data redistribution if a 
key based operator (like an aggregation
+     * or join) is applied to the result {@code KStream}.
+     *
+     * @param mapper
+     *        a {@link KeyValueMapper} that computes a new key for each input 
record
+     *
+     * @param <KOut> the new key type of the result {@code KStream}
+     *
+     * @return A {@code KStream} that contains records with new key (possibly 
of a different type) and unmodified value.
      *
-     * @param mapper a {@link KeyValueMapper} that computes a new key for each 
record
-     * @param named  a {@link Named} config used to name the processor in the 
topology
-     * @param <KR>   the new key type of the result stream
-     * @return a {@code KStream} that contains records with new key (possibly 
of different type) and unmodified value
      * @see #map(KeyValueMapper)
-     * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @see #flatMapValues(ValueMapper)
-     * @see #flatMapValues(ValueMapperWithKey)
-     */
-    <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? 
extends KR> mapper,
-                                  final Named named);
-
-    /**
-     * Transform each record of the input stream into a new record in the 
output stream (both key and value type can be
-     * altered arbitrarily).
-     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
-     * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)} for
-     * stateful record processing).
-     * <p>
-     * The example below normalizes the String key to upper-case letters and 
counts the number of token of the value string.
-     * <pre>{@code
-     * KStream<String, String> inputStream = builder.stream("topic");
-     * KStream<String, Integer> outputStream = inputStream.map(new 
KeyValueMapper<String, String, KeyValue<String, Integer>> {
-     *     KeyValue<String, Integer> apply(String key, String value) {
-     *         return new KeyValue<>(key.toUpperCase(), value.split(" 
").length);
-     *     }
-     * });
-     * }</pre>
-     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
-     * <p>
-     * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
-     * join) is applied to the result {@code KStream}. (cf. {@link 
#mapValues(ValueMapper)})
-     *
-     * @param mapper a {@link KeyValueMapper} that computes a new output record
-     * @param <KR>   the key type of the result stream
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains records with new key and value 
(possibly both of different type)
-     * @see #selectKey(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
      * @see #flatMapValues(ValueMapper)
-     * @see #flatMapValues(ValueMapperWithKey)
-     * @see #process(ProcessorSupplier, String...)
-     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
-    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper);
+    <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super 
V, ? extends KOut> mapper);
 
     /**
-     * Transform each record of the input stream into a new record in the 
output stream (both key and value type can be
-     * altered arbitrarily).
-     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
-     * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)} for
-     * stateful record processing).
-     * <p>
-     * The example below normalizes the String key to upper-case letters and 
counts the number of token of the value string.
-     * <pre>{@code
-     * KStream<String, String> inputStream = builder.stream("topic");
-     * KStream<String, Integer> outputStream = inputStream.map(new 
KeyValueMapper<String, String, KeyValue<String, Integer>> {
-     *     KeyValue<String, Integer> apply(String key, String value) {
-     *         return new KeyValue<>(key.toUpperCase(), value.split(" 
").length);
-     *     }
-     * });
-     * }</pre>
-     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
-     * <p>
-     * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
-     * join) is applied to the result {@code KStream}. (cf. {@link 
#mapValues(ValueMapper)})
+     * See {@link #selectKey(KeyValueMapper)}.
      *
-     * @param mapper a {@link KeyValueMapper} that computes a new output record
-     * @param named  a {@link Named} config used to name the processor in the 
topology
-     * @param <KR>   the key type of the result stream
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains records with new key and value 
(possibly both of different type)
-     * @see #selectKey(KeyValueMapper)
-     * @see #flatMap(KeyValueMapper)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @see #flatMapValues(ValueMapper)
-     * @see #flatMapValues(ValueMapperWithKey)
-     * @see #process(ProcessorSupplier, String...)
-     * @see #processValues(FixedKeyProcessorSupplier, String...)
+     * <p>Takes an additional {@link Named} parameter that is used to name the 
processor in the topology.
      */
-    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper,
-                                 final Named named);
+    <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super 
V, ? extends KOut> mapper,
+                                      final Named named);
 
     /**
      * Transform the value of each input record into a new value (with 
possible new type) of the output record.
@@ -387,6 +286,82 @@ public interface KStream<K, V> {
     <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super 
V, ? extends VR> mapper,
                                   final Named named);
 
+    /**
+     * Transform each record of the input stream into a new record in the 
output stream (both key and value type can be
+     * altered arbitrarily).
+     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
+     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
+     * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)} for
+     * stateful record processing).
+     * <p>
+     * The example below normalizes the String key to upper-case letters and 
counts the number of token of the value string.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.map(new 
KeyValueMapper<String, String, KeyValue<String, Integer>> {
+     *     KeyValue<String, Integer> apply(String key, String value) {
+     *         return new KeyValue<>(key.toUpperCase(), value.split(" 
").length);
+     *     }
+     * });
+     * }</pre>
+     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
+     * <p>
+     * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
+     * join) is applied to the result {@code KStream}. (cf. {@link 
#mapValues(ValueMapper)})
+     *
+     * @param mapper a {@link KeyValueMapper} that computes a new output record
+     * @param <KR>   the key type of the result stream
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with new key and value 
(possibly both of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #process(ProcessorSupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
+     */
+    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper);
+
+    /**
+     * Transform each record of the input stream into a new record in the 
output stream (both key and value type can be
+     * altered arbitrarily).
+     * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
+     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
+     * This is a stateless record-by-record operation (cf. {@link 
#process(ProcessorSupplier, String...)} for
+     * stateful record processing).
+     * <p>
+     * The example below normalizes the String key to upper-case letters and 
counts the number of token of the value string.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.map(new 
KeyValueMapper<String, String, KeyValue<String, Integer>> {
+     *     KeyValue<String, Integer> apply(String key, String value) {
+     *         return new KeyValue<>(key.toUpperCase(), value.split(" 
").length);
+     *     }
+     * });
+     * }</pre>
+     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
+     * <p>
+     * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
+     * join) is applied to the result {@code KStream}. (cf. {@link 
#mapValues(ValueMapper)})
+     *
+     * @param mapper a {@link KeyValueMapper} that computes a new output record
+     * @param named  a {@link Named} config used to name the processor in the 
topology
+     * @param <KR>   the key type of the result stream
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with new key and value 
(possibly both of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #process(ProcessorSupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
+     */
+    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper,
+                                 final Named named);
+
     /**
      * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
      * can be altered arbitrarily).
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 94e0e9a0e36..da728c3e410 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -200,13 +200,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends KR> mapper) {
+    public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends KOut> mapper) {
         return selectKey(mapper, NamedInternal.empty());
     }
 
     @Override
-    public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends KR> mapper,
-                                         final Named named) {
+    public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends KOut> mapper,
+                                             final Named named) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(named, "named can't be null");
 
@@ -236,37 +236,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         return new ProcessorGraphNode<>(name, processorParameters);
     }
 
-    @Override
-    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
-        return map(mapper, NamedInternal.empty());
-    }
-
-    @Override
-    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
-                                        final Named named) {
-        Objects.requireNonNull(mapper, "mapper can't be null");
-        Objects.requireNonNull(named, "named can't be null");
-
-        final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
-        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
-            new ProcessorParameters<>(new KStreamMap<>(mapper), name);
-        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
-            new ProcessorGraphNode<>(name, processorParameters);
-        mapProcessorNode.keyChangingOperation(true);
-
-        builder.addGraphNode(graphNode, mapProcessorNode);
-
-        // key and value serde cannot be preserved
-        return new KStreamImpl<>(
-            name,
-            null,
-            null,
-            subTopologySourceNodes,
-            true,
-            mapProcessorNode,
-            builder);
-    }
-
     @Override
     public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? 
extends VR> valueMapper) {
         return mapValues(withKey(valueMapper));
@@ -309,6 +278,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             builder);
     }
 
+    @Override
+    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
+        return map(mapper, NamedInternal.empty());
+    }
+
+    @Override
+    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
+                                        final Named named) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+
+        final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
+        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+            new ProcessorParameters<>(new KStreamMap<>(mapper), name);
+        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
+            new ProcessorGraphNode<>(name, processorParameters);
+        mapProcessorNode.keyChangingOperation(true);
+
+        builder.addGraphNode(graphNode, mapProcessorNode);
+
+        // key and value serde cannot be preserved
+        return new KStreamImpl<>(
+            name,
+            null,
+            null,
+            subTopologySourceNodes,
+            true,
+            mapProcessorNode,
+            builder);
+    }
+
     @Override
     public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? 
super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> 
mapper) {
         return flatMap(mapper, NamedInternal.empty());

Reply via email to