This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65961516fdc MINOR: cleanup KStream JavaDocs (4/N) -
stream-table-inner-join (#18721)
65961516fdc is described below
commit 65961516fdc2281763633ab6574680c3b7ca274c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:48:49 2025 -0800
MINOR: cleanup KStream JavaDocs (4/N) - stream-table-inner-join (#18721)
Reviewers: Lucas Brutschy <[email protected]>, Bill Bejeck
<[email protected]>
---
.../org/apache/kafka/streams/kstream/KStream.java | 375 ++++++---------------
.../streams/kstream/internals/KStreamImpl.java | 52 +--
.../kstream/internals/KStreamKTableJoin.java | 23 +-
.../internals/KStreamKTableJoinProcessor.java | 42 +--
4 files changed, 171 insertions(+), 321 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8d6c1e33ab7..f32607e9d88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -34,6 +34,9 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+
+import java.time.Duration;
/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link
KeyValue} pairs, i.e., each record is an
@@ -1977,23 +1980,26 @@ public interface KStream<K, V> {
final ValueJoinerWithKey<? super K, ?
super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO>
streamJoined);
+
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi join with default
- * serializers and deserializers.
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
+ * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi-join.
+ * The join is a primary key table lookup join with join attribute {@code
streamRecord.key == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
+ * This is done by performing a lookup for matching records into the
internal {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
* will not produce any result records.
- * <p>
- * For each {@code KStream} record that finds a corresponding record in
{@link KTable} the provided
+ *
+ * <p>For each {@code KStream} record that finds a joining record in the
{@link KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
+ * If you need read access to the join key, use {@link #join(KTable,
ValueJoinerWithKey)}.
+ * If a {@code KStream} input record's key or value is {@code null} the
input record will be dropped, and no join
+ * computation is triggered.
+ * If a {@link KTable} input record's key is {@code null} the input record
will be dropped, and the table state
+ * won't be updated.
+ * {@link KTable} input records with {@code null} values are considered
deletes (so-called tombstone) for the table.
+ *
+ * <p>Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
@@ -2020,276 +2026,115 @@ public interface KStream<K, V> {
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
+ *
+ * By default, {@code KStream} records are processed by performing a
lookup for matching records in the
+ * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+ * This default implementation does not handle out-of-order records in
either input of the join well.
+ * See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a
stream-table join to handle out-of-order
+ * data.
+ *
+ * <p>{@code KStream} and {@link KTable} (or to be more precise, their
underlying source topics) need to have the
+ * same number of partitions (cf. {@link #join(GlobalKTable,
KeyValueMapper, ValueJoiner)}).
+ * If this is not the case (and if no auto-repartitioning happens for the
{@code KStream}, see further below),
+ * you would need to call {@link #repartition(Repartitioned)} for this
{@code KStream} before doing the join,
+ * specifying the same number of partitions via {@link Repartitioned}
parameter as the given {@link KTable}.
+ * Furthermore, {@code KStream} and {@link KTable} need to be
co-partitioned on the join key
+ * (i.e., use the same partitioner).
+ * Note: Kafka Streams cannot verify the used partitioner, so it is the
user's responsibility to ensure
+ * that the same partitioner is used for both inputs of the join.
+ *
+ * <p>If a key changing operator was used on this {@code KStream} before
this operation
+ * (e.g., {@link #selectKey(KeyValueMapper)}, {@link
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+ * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will
automatically repartition the data of this
+ * {@code KStream}, i.e., it will create an internal repartitioning topic
in Kafka and write and re-read
+ * the data via this topic such that data is correctly partitioned by the
{@link KTable}'s key.
+ *
+ * <p>The repartitioning topic will be named
"${applicationId}-<name>-repartition",
+ * where "applicationId" is user-specified in {@link StreamsConfig} via
parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is an internally generated name, and "-repartition" is a
fixed suffix.
+ * The number of partitions for the repartition topic is determined based
on number of partitions of the
* {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * Furthermore, the topic(s) will be created with infinite retention time
and data will be automatically purged
+ * by Kafka Streams.
+ *
+ * <p>You can retrieve all generated internal topic names via {@link
Topology#describe()}.
+ * To explicitly set key/value serdes or to customize the names of the
repartition topic,
+ * use {@link #join(KTable, ValueJoiner, Joined)}.
+ * For more control over the repartitioning, use {@link
#repartition(Repartitioned)} before {@code join()}.
+ *
+ * @param table
+ * the {@link KTable} to be joined with this stream
+ * @param joiner
+ * a {@link ValueJoiner} that computes the join result for a pair
of matching records
+ *
+ * @param <TableValue> the value type of the table
+ * @param <VOut> the value type of the result stream
+ *
+ * @return A {@code KStream} that contains join-records, one for each
matched stream record, with the corresponding
+ * key and a value computed by the given {@link ValueJoiner}.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for a
pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner)
- * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
- <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
- final ValueJoiner<? super V, ? super VT, ?
extends VR> joiner);
+ <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+ final ValueJoiner<? super V, ?
super TableValue, ? extends VOut> joiner);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi join with default
- * serializers and deserializers.
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record that finds a corresponding record in
{@link KTable} the provided
- * {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
- * Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- *
- * The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoinerWithKey(K1,C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * See {@link #join(KTable, ValueJoiner)}.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoinerWithKey} that computes the join
result for a pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoinerWithKey}, one for each matched record-pair with the
same key
- * @see #leftJoin(KTable, ValueJoinerWithKey)
- * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+ * <p>Note that the key is read-only and must not be modified, as this can
lead to corrupt partitioning.
*/
- <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
- final ValueJoinerWithKey<? super K, ? super
V, ? super VT, ? extends VR> joiner);
+ <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+ final ValueJoinerWithKey<? super
K, ? super V, ? super TableValue, ? extends VOut> joiner);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi join with default
- * serializers and deserializers.
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record that finds a corresponding record in
{@link KTable} the provided
- * {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
- * The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoiner(C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi-join.
+ * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used
{@link KTable} is backed by a
+ * {@link org.apache.kafka.streams.state.VersionedKeyValueStore
VersionedKeyValueStore}, the additional
+ * {@link Joined} parameter allows to specify a join grace-period, to
handle out-of-order data gracefully.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for a
pair of matching records
- * @param joined a {@link Joined} instance that defines the serdes to
- * be used to serialize/deserialize inputs of the
joined streams
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoiner}, one for each matched record-pair with the same key
- * @see #leftJoin(KTable, ValueJoiner, Joined)
- * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+ * <p>For details about stream-table semantics, including co-partitioning
requirements, (auto-)repartitioning,
+ * and more see {@link #join(KTable, ValueJoiner)}.
+ * If you specify a grace-period to handle out-of-order data, see further
details below.
+ *
+ * <p>To handle out-of-order records, the input {@link KTable} must use a
+ * {@link org.apache.kafka.streams.state.VersionedKeyValueStore
VersionedKeyValueStore} (specified via a
+ * {@link Materialized} parameter when the {@link KTable} is created), and
a join
+ * {@link Joined#withGracePeriod(Duration) grace-period} must be specified.
+ * For this case, {@code KStream} records are buffered until the end of
the grace period and the {@link KTable}
+ * lookup is performed with some delay.
+ * Given that the {@link KTable} state is versioned, the lookup can use
"event time", allowing out-of-order
+ * {@code KStream} records, to join to the right (older) version of a
{@link KTable} record with the same key.
+ * Also, {@link KTable} out-of-order updates are handled correctly by the
versioned state store.
+ * Note, that using a join grace-period introduces the notion of late
records, i.e., records with a timestamp
+ * smaller than the defined grace-period allows; these late records will
be dropped, and not join computation
+ * is triggered.
+ * Using a versioned state store for the {@link KTable} also implies that
the defined
+ * {@link VersionedBytesStoreSupplier#historyRetentionMs() history
retention} provides
+ * a cut-off point, and late records will be dropped, not updating the
{@link KTable} state.
+ *
+ * <p>If a join grace-period is specified, the {@code KStream} will be
materialized in a local state store.
+ * For failure and recovery this store will be backed by an internal
changelog topic that will be created in Kafka.
+ * The changelog topic will be named
"${applicationId}-<storename>-changelog",
+ * where "applicationId" is user-specified in {@link StreamsConfig} via
parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "storeName" is an internally generated name, and "-changelog" is a
fixed suffix.
+ *
+ * <p>You can retrieve all generated internal topic names via {@link
Topology#describe()}.
+ * To customize the name of the changelog topic, use {@link Joined} input
parameter.
*/
- <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
- final ValueJoiner<? super V, ? super VT, ?
extends VR> joiner,
- final Joined<K, V, VT> joined);
+ <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+ final ValueJoiner<? super V, ?
super TableValue, ? extends VOut> joiner,
+ final Joined<K, V, TableValue>
joined);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed inner equi join with default
- * serializers and deserializers.
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record that finds a corresponding record in
{@link KTable} the provided
- * {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
- * The key of the result record is the same as for both joining input
records.
- * Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
+ * See {@link #join(KTable, ValueJoiner, Joined)}.
*
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoinerWithKey(K1,C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
- *
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoinerWithKey} that computes the join
result for a pair of matching records
- * @param joined a {@link Joined} instance that defines the serdes to
- * be used to serialize/deserialize inputs of the
joined streams
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoinerWithKey}, one for each matched record-pair with the
same key
- * @see #leftJoin(KTable, ValueJoinerWithKey, Joined)
- * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+ * <p>Note that the key is read-only and must not be modified, as this can
lead to corrupt partitioning.
*/
- <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
- final ValueJoinerWithKey<? super K, ? super
V, ? super VT, ? extends VR> joiner,
- final Joined<K, V, VT> joined);
+ <TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
+ final ValueJoinerWithKey<? super
K, ? super V, ? super TableValue, ? extends VOut> joiner,
+ final Joined<K, V, TableValue>
joined);
/**
* Join records of this stream with {@link KTable}'s records using
non-windowed left equi join with default
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 35ef36bb816..5101d02a034 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -961,21 +961,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
}
@Override
- public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
- final ValueJoiner<? super V, ? super
VO, ? extends VR> joiner) {
+ public <TableValue, VOut> KStream<K, VOut> join(
+ final KTable<K, TableValue> table,
+ final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner
+ ) {
return join(table, toValueJoinerWithKey(joiner));
}
@Override
- public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
- final ValueJoinerWithKey<? super K, ?
super V, ? super VO, ? extends VR> joiner) {
+ public <TableValue, VOut> KStream<K, VOut> join(
+ final KTable<K, TableValue> table,
+ final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ?
extends VOut> joiner
+ ) {
return join(table, joiner, Joined.with(null, null, null));
}
@Override
- public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
- final ValueJoiner<? super V, ? super
VO, ? extends VR> joiner,
- final Joined<K, V, VO> joined) {
+ public <TableValue, VOut> KStream<K, VOut> join(
+ final KTable<K, TableValue> table,
+ final ValueJoiner<? super V, ? super TableValue, ? extends VOut>
joiner,
+ final Joined<K, V, TableValue> joined
+ ) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
@@ -983,14 +989,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
}
@Override
- public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
- final ValueJoinerWithKey<? super K, ?
super V, ? super VO, ? extends VR> joiner,
- final Joined<K, V, VO> joined) {
+ public <TableValue, VOut> KStream<K, VOut> join(
+ final KTable<K, TableValue> table,
+ final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ?
extends VOut> joiner,
+ final Joined<K, V, TableValue> joined
+ ) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
- final JoinedInternal<K, V, VO> joinedInternal = new
JoinedInternal<>(joined);
+ final JoinedInternal<K, V, TableValue> joinedInternal = new
JoinedInternal<>(joined);
final String name = joinedInternal.name();
if (repartitionRequired) {
@@ -1149,14 +1157,14 @@ public class KStreamImpl<K, V> extends
AbstractStream<K, V> implements KStream<K
}
@SuppressWarnings({"unchecked", "resource"})
- private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO>
table,
- final
ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
- final JoinedInternal<K,
V, VO> joinedInternal,
- final boolean leftJoin) {
+ private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K,
VTable> table,
+ final
ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
+ final
JoinedInternal<K, V, VTable> joinedInternal,
+ final boolean
leftJoin) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final Set<String> allSourceNodes =
ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
+ final Set<String> allSourceNodes =
ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
@@ -1165,7 +1173,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
if (joinedInternal.gracePeriod() != null) {
- if (!((KTableImpl<K, ?, VO>)
table).graphNode.isOutputVersioned().orElse(true)) {
+ if (!((KTableImpl<K, ?, VTable>)
table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
}
final String bufferName = name + "-Buffer";
@@ -1178,19 +1186,19 @@ public class KStreamImpl<K, V> extends
AbstractStream<K, V> implements KStream<K
);
}
- final ProcessorSupplier<K, V, K, VR> processorSupplier = new
KStreamKTableJoin<>(
- ((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
+ final ProcessorSupplier<K, V, K, VOut> processorSupplier = new
KStreamKTableJoin<>(
+ ((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
joiner,
leftJoin,
Optional.ofNullable(joinedInternal.gracePeriod()),
bufferStoreBuilder
);
- final ProcessorParameters<K, V, K, VR> processorParameters = new
ProcessorParameters<>(processorSupplier, name);
- final StreamTableJoinNode<K, V, VR> streamTableJoinNode = new
StreamTableJoinNode<>(
+ final ProcessorParameters<K, V, K, VOut> processorParameters = new
ProcessorParameters<>(processorSupplier, name);
+ final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new
StreamTableJoinNode<>(
name,
processorParameters,
- ((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
+ ((KTableImpl<K, ?, VTable>)
table).valueGetterSupplier().storeNames(),
this.name,
joinedInternal.gracePeriod()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index 23fbe9e701c..dc33a167721 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -23,23 +23,22 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.time.Duration;
+import java.util.Collections;
import java.util.Optional;
import java.util.Set;
-import static java.util.Collections.singleton;
+class KStreamKTableJoin<StreamKey, StreamValue, TableValue, VOut> implements
ProcessorSupplier<StreamKey, StreamValue, StreamKey, VOut> {
-class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
-
- private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) ->
key;
- private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
- private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joiner;
+ private final KeyValueMapper<StreamKey, StreamValue, StreamKey>
keyValueMapper = (key, value) -> key;
+ private final KTableValueGetterSupplier<StreamKey, TableValue>
valueGetterSupplier;
+ private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ?
super TableValue, ? extends VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<String> storeName;
private final Set<StoreBuilder<?>> stores;
- KStreamKTableJoin(final KTableValueGetterSupplier<K, V2>
valueGetterSupplier,
- final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, ? extends VOut> joiner,
+ KStreamKTableJoin(final KTableValueGetterSupplier<StreamKey, TableValue>
valueGetterSupplier,
+ final ValueJoinerWithKey<? super StreamKey, ? super
StreamValue, ? super TableValue, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<StoreBuilder<?>> bufferStoreBuilder) {
@@ -49,11 +48,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
this.gracePeriod = gracePeriod;
this.storeName = bufferStoreBuilder.map(StoreBuilder::name);
- if (bufferStoreBuilder.isEmpty()) {
- this.stores = null;
- } else {
- this.stores = singleton(bufferStoreBuilder.get());
- }
+ this.stores =
bufferStoreBuilder.<Set<StoreBuilder<?>>>map(Collections::singleton).orElse(null);
}
@Override
@@ -62,7 +57,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
}
@Override
- public Processor<K, V1, K, VOut> get() {
+ public Processor<StreamKey, StreamValue, StreamKey, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(),
keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 637d870ee7e..19231486c6b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -42,24 +42,26 @@ import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends
ContextualProcessor<K1, V1, K1, VOut> {
+class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue,
VOut>
+ extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
+
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKTableJoin.class);
- private final KTableValueGetter<K2, V2> valueGetter;
- private final KeyValueMapper<? super K1, ? super V1, ? extends K2>
keyMapper;
- private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ?
extends VOut> joiner;
+ private final KTableValueGetter<TableKey, TableValue> valueGetter;
+ private final KeyValueMapper<? super StreamKey, ? super StreamValue, ?
extends TableKey> keyMapper;
+ private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ?
super TableValue, ? extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
private final Optional<Duration> gracePeriod;
- private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
+ private TimeOrderedKeyValueBuffer<StreamKey, StreamValue, StreamValue>
buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
- private InternalProcessorContext<K1, VOut> internalProcessorContext;
+ private InternalProcessorContext<StreamKey, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;
- KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
- final KeyValueMapper<? super K1, ? super V1, ?
extends K2> keyMapper,
- final ValueJoinerWithKey<? super K1, ? super
V1, ? super V2, ? extends VOut> joiner,
+ KStreamKTableJoinProcessor(final KTableValueGetter<TableKey, TableValue>
valueGetter,
+ final KeyValueMapper<? super StreamKey, ? super
StreamValue, ? extends TableKey> keyMapper,
+ final ValueJoinerWithKey<? super StreamKey, ?
super StreamValue, ? super TableValue, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<String> storeName) {
@@ -73,7 +75,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
}
@Override
- public void init(final ProcessorContext<K1, VOut> context) {
+ public void init(final ProcessorContext<StreamKey, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
@@ -89,7 +91,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
}
@Override
- public void process(final Record<K1, V1> record) {
+ public void process(final Record<StreamKey, StreamValue> record) {
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
return;
@@ -106,8 +108,8 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
}
}
- private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit)
{
- final Record<K1, V1> record = new Record<>(toEmit.key(),
toEmit.value(), toEmit.recordContext().timestamp())
+ private void emit(final TimeOrderedKeyValueBuffer.Eviction<StreamKey,
StreamValue> toEmit) {
+ final Record<StreamKey, StreamValue> record = new
Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers());
final ProcessorRecordContext prevRecordContext =
internalProcessorContext.recordContext();
try {
@@ -122,23 +124,23 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
- private void doJoin(final Record<K1, V1> record) {
- final K2 mappedKey = keyMapper.apply(record.key(), record.value());
- final V2 value2 = getValue2(record, mappedKey);
+ private void doJoin(final Record<StreamKey, StreamValue> record) {
+ final TableKey mappedKey = keyMapper.apply(record.key(),
record.value());
+ final TableValue value2 = getValue2(record, mappedKey);
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(),
record.value(), value2)));
}
}
- private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
+ private TableValue getValue2(final Record<StreamKey, StreamValue> record,
final TableKey mappedKey) {
if (mappedKey == null) return null;
- final ValueAndTimestamp<V2> valueAndTimestamp =
valueGetter.isVersioned()
+ final ValueAndTimestamp<TableValue> valueAndTimestamp =
valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
return getValueOrNull(valueAndTimestamp);
}
- private boolean maybeDropRecord(final Record<K1, V1> record) {
+ private boolean maybeDropRecord(final Record<StreamKey, StreamValue>
record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper}
returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is
the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can
have other keyMappers
@@ -147,7 +149,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
// an empty message (ie, there is nothing to be joined) -- this
contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply()
indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null
values are ignored
- final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+ final TableKey mappedKey = keyMapper.apply(record.key(),
record.value());
if (leftJoin && record.key() == null && record.value() != null) {
return false;
}