This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65961516fdc MINOR: cleanup KStream JavaDocs (4/N) - 
stream-table-inner-join (#18721)
65961516fdc is described below

commit 65961516fdc2281763633ab6574680c3b7ca274c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:48:49 2025 -0800

    MINOR: cleanup KStream JavaDocs (4/N) - stream-table-inner-join (#18721)
    
    Reviewers: Lucas Brutschy <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 375 ++++++---------------
 .../streams/kstream/internals/KStreamImpl.java     |  52 +--
 .../kstream/internals/KStreamKTableJoin.java       |  23 +-
 .../internals/KStreamKTableJoinProcessor.java      |  42 +--
 4 files changed, 171 insertions(+), 321 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8d6c1e33ab7..f32607e9d88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -34,6 +34,9 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+
+import java.time.Duration;
 
 /**
  * {@code KStream} is an abstraction of a <i>record stream</i> of {@link 
KeyValue} pairs, i.e., each record is an
@@ -1977,23 +1980,26 @@ public interface KStream<K, V> {
                                       final ValueJoinerWithKey<? super K, ? 
super V, ? super VO, ? extends VR> joiner,
                                       final JoinWindows windows,
                                       final StreamJoined<K, V, VO> 
streamJoined);
+
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi-join.
+     * The join is a primary key table lookup join with join attribute {@code 
streamRecord.key == tableRecord.key}.
      * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
+     * This is done by performing a lookup for matching records into the 
internal {@link KTable} state.
      * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
      * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
+     *
+     * <p>For each {@code KStream} record that finds a joining record in the 
{@link KTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
      * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
+     * If you need read access to the join key, use {@link #join(KTable, 
ValueJoinerWithKey)}.
+     * If a {@code KStream} input record's key or value is {@code null} the 
input record will be dropped, and no join
+     * computation is triggered.
+     * If a {@link KTable} input record's key is {@code null} the input record 
will be dropped, and the table state
+     * won't be updated.
+     * {@link KTable} input records with {@code null} values are considered 
deletes (so-called tombstone) for the table.
+     *
+     * <p>Example:
      * <table border='1'>
      * <tr>
      * <th>KStream</th>
@@ -2020,276 +2026,115 @@ public interface KStream<K, V> {
      * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
+     *
+     * By default, {@code KStream} records are processed by performing a 
lookup for matching records in the
+     * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+     * This default implementation does not handle out-of-order records in 
either input of the join well.
+     * See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a 
stream-table join to handle out-of-order
+     * data.
+     *
+     * <p>{@code KStream} and {@link KTable} (or to be more precise, their 
underlying source topics) need to have the
+     * same number of partitions (cf. {@link #join(GlobalKTable, 
KeyValueMapper, ValueJoiner)}).
+     * If this is not the case (and if no auto-repartitioning happens for the 
{@code KStream}, see further below),
+     * you would need to call {@link #repartition(Repartitioned)} for this 
{@code KStream} before doing the join,
+     * specifying the same number of partitions via {@link Repartitioned} 
parameter as the given {@link KTable}.
+     * Furthermore, {@code KStream} and {@link KTable} need to be 
co-partitioned on the join key
+     * (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioner, so it is the 
user's responsibility to ensure
+     * that the same partitioner is used for both inputs of the join.
+     *
+     * <p>If a key changing operator was used on this {@code KStream} before 
this operation
+     * (e.g., {@link #selectKey(KeyValueMapper)}, {@link 
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will 
automatically repartition the data of this
+     * {@code KStream}, i.e., it will create an internal repartitioning topic 
in Kafka and write and re-read
+     * the data via this topic such that data is correctly partitioned by the 
{@link KTable}'s key.
+     *
+     * <p>The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic is determined based 
on number of partitions of the
      * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * Furthermore, the topic(s) will be created with infinite retention time 
and data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes or to customize the names of the 
repartition topic,
+     * use {@link #join(KTable, ValueJoiner, Joined)}.
+     * For more control over the repartitioning, use {@link 
#repartition(Repartitioned)} before {@code join()}.
+     *
+     * @param table
+     *        the {@link KTable} to be joined with this stream
+     * @param joiner
+     *        a {@link ValueJoiner} that computes the join result for a pair 
of matching records
+     *
+     * @param <TableValue> the value type of the table
+     * @param <VOut> the value type of the result stream
+     *
+     * @return A {@code KStream} that contains join-records, one for each 
matched stream record, with the corresponding
+     *         key and a value computed by the given {@link ValueJoiner}.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner);
+    <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+                                             final ValueJoiner<? super V, ? 
super TableValue, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     *
-     * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * See {@link #join(KTable, ValueJoiner)}.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one for each matched record-pair with the 
same key
-     * @see #leftJoin(KTable, ValueJoinerWithKey)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoinerWithKey<? super K, ? super 
V, ? super VT, ? extends VR> joiner);
+    <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+                                             final ValueJoinerWithKey<? super 
K, ? super V, ? super TableValue, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
-     * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
-     * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi-join.
+     * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used 
{@link KTable} is backed by a
+     * {@link org.apache.kafka.streams.state.VersionedKeyValueStore 
VersionedKeyValueStore}, the additional
+     * {@link Joined} parameter allows to specify a join grace-period, to 
handle out-of-order data gracefully.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param joined      a {@link Joined} instance that defines the serdes to
-     *                    be used to serialize/deserialize inputs of the 
joined streams
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one for each matched record-pair with the same key
-     * @see #leftJoin(KTable, ValueJoiner, Joined)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * <p>For details about stream-table semantics, including co-partitioning 
requirements, (auto-)repartitioning,
+     * and more see {@link #join(KTable, ValueJoiner)}.
+     * If you specify a grace-period to handle out-of-order data, see further 
details below.
+     *
+     * <p>To handle out-of-order records, the input {@link KTable} must use a
+     * {@link org.apache.kafka.streams.state.VersionedKeyValueStore 
VersionedKeyValueStore} (specified via a
+     * {@link Materialized} parameter when the {@link KTable} is created), and 
a join
+     * {@link Joined#withGracePeriod(Duration) grace-period} must be specified.
+     * For this case, {@code KStream} records are buffered until the end of 
the grace period and the {@link KTable}
+     * lookup is performed with some delay.
+     * Given that the {@link KTable} state is versioned, the lookup can use 
"event time", allowing out-of-order
+     * {@code KStream} records, to join to the right (older) version of a 
{@link KTable} record with the same key.
+     * Also, {@link KTable} out-of-order updates are handled correctly by the 
versioned state store.
+     * Note, that using a join grace-period introduces the notion of late 
records, i.e., records with a timestamp
+     * smaller than the defined grace-period allows; these late records will 
be dropped, and not join computation
+     * is triggered.
+     * Using a versioned state store for the {@link KTable} also implies that 
the defined
+     * {@link VersionedBytesStoreSupplier#historyRetentionMs() history 
retention} provides
+     * a cut-off point, and late records will be dropped, not updating the 
{@link KTable} state.
+     *
+     * <p>If a join grace-period is specified, the {@code KStream} will be 
materialized in a local state store.
+     * For failure and recovery this store will be backed by an internal 
changelog topic that will be created in Kafka.
+     * The changelog topic will be named 
"${applicationId}-&lt;storename&gt;-changelog",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To customize the name of the changelog topic, use {@link Joined} input 
parameter.
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner,
-                                 final Joined<K, V, VT> joined);
+    <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+                                             final ValueJoiner<? super V, ? 
super TableValue, ? extends VOut> joiner,
+                                             final Joined<K, V, TableValue> 
joined);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * The key of the result record is the same as for both joining input 
records.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
+     * See {@link #join(KTable, ValueJoiner, Joined)}.
      *
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param joined      a {@link Joined} instance that defines the serdes to
-     *                    be used to serialize/deserialize inputs of the 
joined streams
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one for each matched record-pair with the 
same key
-     * @see #leftJoin(KTable, ValueJoinerWithKey, Joined)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoinerWithKey<? super K, ? super 
V, ? super VT, ? extends VR> joiner,
-                                 final Joined<K, V, VT> joined);
+    <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+                                             final ValueJoinerWithKey<? super 
K, ? super V, ? super TableValue, ? extends VOut> joiner,
+                                             final Joined<K, V, TableValue> 
joined);
 
     /**
      * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 35ef36bb816..5101d02a034 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -961,21 +961,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
-                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner) {
+    public <TableValue, VOut> KStream<K, VOut> join(
+        final KTable<K, TableValue> table,
+        final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner
+    ) {
         return join(table, toValueJoinerWithKey(joiner));
     }
 
     @Override
-    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
-                                        final ValueJoinerWithKey<? super K, ? 
super V, ? super VO, ? extends VR> joiner) {
+    public <TableValue, VOut> KStream<K, VOut> join(
+        final KTable<K, TableValue> table,
+        final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? 
extends VOut> joiner
+    ) {
         return join(table, joiner, Joined.with(null, null, null));
     }
 
     @Override
-    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
-                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner,
-                                        final Joined<K, V, VO> joined) {
+    public <TableValue, VOut> KStream<K, VOut> join(
+        final KTable<K, TableValue> table,
+        final ValueJoiner<? super V, ? super TableValue, ? extends VOut> 
joiner,
+        final Joined<K, V, TableValue> joined
+    ) {
         Objects.requireNonNull(table, "table can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
@@ -983,14 +989,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
-                                        final ValueJoinerWithKey<? super K, ? 
super V, ? super VO, ? extends VR> joiner,
-                                        final Joined<K, V, VO> joined) {
+    public <TableValue, VOut> KStream<K, VOut> join(
+        final KTable<K, TableValue> table,
+        final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? 
extends VOut> joiner,
+        final Joined<K, V, TableValue> joined
+    ) {
         Objects.requireNonNull(table, "table can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
 
-        final JoinedInternal<K, V, VO> joinedInternal = new 
JoinedInternal<>(joined);
+        final JoinedInternal<K, V, TableValue> joinedInternal = new 
JoinedInternal<>(joined);
         final String name = joinedInternal.name();
 
         if (repartitionRequired) {
@@ -1149,14 +1157,14 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
     }
 
     @SuppressWarnings({"unchecked", "resource"})
-    private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> 
table,
-                                                      final 
ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
-                                                      final JoinedInternal<K, 
V, VO> joinedInternal,
-                                                      final boolean leftJoin) {
+    private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, 
VTable> table,
+                                                              final 
ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
+                                                              final 
JoinedInternal<K, V, VTable> joinedInternal,
+                                                              final boolean 
leftJoin) {
         Objects.requireNonNull(table, "table can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = 
ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
+        final Set<String> allSourceNodes = 
ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
 
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
@@ -1165,7 +1173,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
 
         if (joinedInternal.gracePeriod() != null) {
-            if (!((KTableImpl<K, ?, VO>) 
table).graphNode.isOutputVersioned().orElse(true)) {
+            if (!((KTableImpl<K, ?, VTable>) 
table).graphNode.isOutputVersioned().orElse(true)) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
             final String bufferName = name + "-Buffer";
@@ -1178,19 +1186,19 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
             );
         }
 
-        final ProcessorSupplier<K, V, K, VR> processorSupplier = new 
KStreamKTableJoin<>(
-            ((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
+        final ProcessorSupplier<K, V, K, VOut> processorSupplier = new 
KStreamKTableJoin<>(
+            ((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
             joiner,
             leftJoin,
             Optional.ofNullable(joinedInternal.gracePeriod()),
             bufferStoreBuilder
         );
 
-        final ProcessorParameters<K, V, K, VR> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
-        final StreamTableJoinNode<K, V, VR> streamTableJoinNode = new 
StreamTableJoinNode<>(
+        final ProcessorParameters<K, V, K, VOut> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
+        final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new 
StreamTableJoinNode<>(
             name,
             processorParameters,
-            ((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
+            ((KTableImpl<K, ?, VTable>) 
table).valueGetterSupplier().storeNames(),
             this.name,
             joinedInternal.gracePeriod()
         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index 23fbe9e701c..dc33a167721 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -23,23 +23,22 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
 
-import static java.util.Collections.singleton;
+class KStreamKTableJoin<StreamKey, StreamValue, TableValue, VOut> implements 
ProcessorSupplier<StreamKey, StreamValue, StreamKey, VOut> {
 
-class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
-
-    private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> 
key;
-    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
+    private final KeyValueMapper<StreamKey, StreamValue, StreamKey> 
keyValueMapper = (key, value) -> key;
+    private final KTableValueGetterSupplier<StreamKey, TableValue> 
valueGetterSupplier;
+    private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? 
super TableValue, ? extends VOut> joiner;
     private final boolean leftJoin;
     private final Optional<Duration> gracePeriod;
     private final Optional<String> storeName;
     private final Set<StoreBuilder<?>> stores;
 
-    KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> 
valueGetterSupplier,
-                      final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
+    KStreamKTableJoin(final KTableValueGetterSupplier<StreamKey, TableValue> 
valueGetterSupplier,
+                      final ValueJoinerWithKey<? super StreamKey, ? super 
StreamValue, ? super TableValue, ? extends VOut> joiner,
                       final boolean leftJoin,
                       final Optional<Duration> gracePeriod,
                       final Optional<StoreBuilder<?>> bufferStoreBuilder) {
@@ -49,11 +48,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
         this.gracePeriod = gracePeriod;
         this.storeName = bufferStoreBuilder.map(StoreBuilder::name);
 
-        if (bufferStoreBuilder.isEmpty()) {
-            this.stores = null;
-        } else {
-            this.stores = singleton(bufferStoreBuilder.get());
-        }
+        this.stores = 
bufferStoreBuilder.<Set<StoreBuilder<?>>>map(Collections::singleton).orElse(null);
     }
 
     @Override
@@ -62,7 +57,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
     }
 
     @Override
-    public Processor<K, V1, K, VOut> get() {
+    public Processor<StreamKey, StreamValue, StreamKey, VOut> get() {
         return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), 
keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 637d870ee7e..19231486c6b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -42,24 +42,26 @@ import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends 
ContextualProcessor<K1, V1, K1, VOut> {
+class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue, 
VOut>
+    extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKTableJoin.class);
 
-    private final KTableValueGetter<K2, V2> valueGetter;
-    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> 
keyMapper;
-    private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? 
extends VOut> joiner;
+    private final KTableValueGetter<TableKey, TableValue> valueGetter;
+    private final KeyValueMapper<? super StreamKey, ? super StreamValue, ? 
extends TableKey> keyMapper;
+    private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? 
super TableValue, ? extends VOut> joiner;
     private final boolean leftJoin;
     private Sensor droppedRecordsSensor;
     private final Optional<Duration> gracePeriod;
-    private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
+    private TimeOrderedKeyValueBuffer<StreamKey, StreamValue, StreamValue> 
buffer;
     protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
-    private InternalProcessorContext<K1, VOut> internalProcessorContext;
+    private InternalProcessorContext<StreamKey, VOut> internalProcessorContext;
     private final boolean useBuffer;
     private final String storeName;
 
-    KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
-                               final KeyValueMapper<? super K1, ? super V1, ? 
extends K2> keyMapper,
-                               final ValueJoinerWithKey<? super K1, ? super 
V1, ? super V2, ? extends VOut> joiner,
+    KStreamKTableJoinProcessor(final KTableValueGetter<TableKey, TableValue> 
valueGetter,
+                               final KeyValueMapper<? super StreamKey, ? super 
StreamValue, ? extends TableKey> keyMapper,
+                               final ValueJoinerWithKey<? super StreamKey, ? 
super StreamValue, ? super TableValue, ? extends VOut> joiner,
                                final boolean leftJoin,
                                final Optional<Duration> gracePeriod,
                                final Optional<String> storeName) {
@@ -73,7 +75,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
     }
 
     @Override
-    public void init(final ProcessorContext<K1, VOut> context) {
+    public void init(final ProcessorContext<StreamKey, VOut> context) {
         super.init(context);
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
@@ -89,7 +91,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
     }
 
     @Override
-    public void process(final Record<K1, V1> record) {
+    public void process(final Record<StreamKey, StreamValue> record) {
         updateObservedStreamTime(record.timestamp());
         if (maybeDropRecord(record)) {
             return;
@@ -106,8 +108,8 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         }
     }
 
-    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) 
{
-        final Record<K1, V1> record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<StreamKey, 
StreamValue> toEmit) {
+        final Record<StreamKey, StreamValue> record = new 
Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
             .withHeaders(toEmit.recordContext().headers());
         final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
         try {
@@ -122,23 +124,23 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         observedStreamTime = Math.max(observedStreamTime, timestamp);
     }
 
-    private void doJoin(final Record<K1, V1> record) {
-        final K2 mappedKey = keyMapper.apply(record.key(), record.value());
-        final V2 value2 = getValue2(record, mappedKey);
+    private void doJoin(final Record<StreamKey, StreamValue> record) {
+        final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
+        final TableValue value2 = getValue2(record, mappedKey);
         if (leftJoin || value2 != null) {
             
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
         }
     }
 
-    private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
+    private TableValue getValue2(final Record<StreamKey, StreamValue> record, 
final TableKey mappedKey) {
         if (mappedKey == null) return null;
-        final ValueAndTimestamp<V2> valueAndTimestamp = 
valueGetter.isVersioned()
+        final ValueAndTimestamp<TableValue> valueAndTimestamp = 
valueGetter.isVersioned()
             ? valueGetter.get(mappedKey, record.timestamp())
             : valueGetter.get(mappedKey);
         return getValueOrNull(valueAndTimestamp);
     }
 
-    private boolean maybeDropRecord(final Record<K1, V1> record) {
+    private boolean maybeDropRecord(final Record<StreamKey, StreamValue> 
record) {
         // we do join iff the join keys are equal, thus, if {@code keyMapper} 
returns {@code null} we
         // cannot join and just ignore the record. Note for KTables, this is 
the same as having a null key
         // since keyMapper just returns the key, but for GlobalKTables we can 
have other keyMappers
@@ -147,7 +149,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-        final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+        final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
         if (leftJoin && record.key() == null && record.value() != null) {
             return false;
         }


Reply via email to