This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b5c24974ae2 Kafka 12317: Relax non-null key requirement in Kafka
Streams (#14174)
b5c24974ae2 is described below
commit b5c24974ae2967824d1ec34b5b02121399a5e2f9
Author: Florin Akermann <[email protected]>
AuthorDate: Tue Oct 31 17:09:42 2023 +0100
Kafka 12317: Relax non-null key requirement in Kafka Streams (#14174)
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)
The key requirments got relaxed for the followinger streams dsl operator:
left join Kstream-Kstream: no longer drop left records with null-key and
call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key
and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with
null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with
'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and
call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper
returns 'null' and call ValueJoiner with 'null' for right value.
Reviewers: Walker Carlson <[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 8 +-
docs/streams/upgrade-guide.html | 45 ++++++
.../org/apache/kafka/streams/kstream/KStream.java | 54 ++++----
.../kstream/internals/InternalStreamsBuilder.java | 17 +++
.../streams/kstream/internals/KStreamImpl.java | 9 +-
.../streams/kstream/internals/KStreamImplJoin.java | 3 +
.../kstream/internals/KStreamJoinWindow.java | 2 +-
.../kstream/internals/KStreamKStreamJoin.java | 18 +--
.../internals/KStreamKTableJoinProcessor.java | 16 ++-
.../internals/graph/BaseRepartitionNode.java | 10 +-
.../streams/kstream/internals/graph/GraphNode.java | 15 ++
.../NodesWithRelaxedNullKeyJoinDownstream.java | 54 ++++++++
.../integration/AbstractJoinIntegrationTest.java | 10 +-
.../integration/KStreamKStreamIntegrationTest.java | 4 +-
.../KStreamRepartitionIntegrationTest.java | 7 +-
.../RelaxedNullKeyRequirementJoinTest.java | 151 +++++++++++++++++++++
.../StreamStreamJoinIntegrationTest.java | 34 ++++-
.../integration/TableTableJoinIntegrationTest.java | 36 +++--
18 files changed, 426 insertions(+), 67 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index 08bf2ef8cf8..88f7d6f4ae2 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -1893,7 +1893,7 @@ KStream<String, String> joined =
left.leftJoin(right,
join output records.</p>
<blockquote>
<div><ul class="simple">
- <li>Input records with a
<code class="docutils literal"><span class="pre">null</span></code> key or a
<code class="docutils literal"><span class="pre">null</span></code> value are
ignored and do not trigger the join.</li>
+ <li>Input records with a
<code class="docutils literal"><span class="pre">null</span></code> value are
ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
@@ -1954,7 +1954,7 @@ KStream<String, String> joined =
left.outerJoin(right,
join output records.</p>
<blockquote>
<div><ul class="simple">
- <li>Input records with a
<code class="docutils literal"><span class="pre">null</span></code> key or a
<code class="docutils literal"><span class="pre">null</span></code> value are
ignored and do not trigger the join.</li>
+ <li>Input records with a
<code class="docutils literal"><span class="pre">null</span></code> value are
ignored and do not trigger the join.</li>
</ul>
</div></blockquote>
</li>
@@ -2894,7 +2894,7 @@ KStream<String, String> joined =
left.leftJoin(right,
<blockquote>
<div><ul class="simple">
<li>Only input records for
the left side (stream) trigger the join. Input records for the right side
(table) update only the internal right-side join state.</li>
- <li>Input records for the
stream with a <code class="docutils literal"><span
class="pre">null</span></code> key or a <code class="docutils literal"><span
class="pre">null</span></code> value are ignored and do not trigger the
join.</li>
+ <li>Input records for the
stream with a <code class="docutils literal"><span
class="pre">null</span></code> value are ignored and do not trigger the
join.</li>
<li>Input records for the
table with a <code class="docutils literal"><span
class="pre">null</span></code> value are interpreted as <em>tombstones</em> for
the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not
trigger the join.</li>
</ul>
@@ -3165,7 +3165,7 @@ KStream<String, String> joined =
left.leftJoin(right,
<blockquote>
<div><ul class="simple">
<li>Only input records for
the left side (stream) trigger the join. Input records for the right side
(table) update only the internal right-side join state.</li>
- <li>Input records for the
stream with a <code class="docutils literal"><span
class="pre">null</span></code> key or a <code class="docutils literal"><span
class="pre">null</span></code> value are ignored and do not trigger the
join.</li>
+ <li>Input records for the
stream with a <code class="docutils literal"><span
class="pre">null</span></code> value are ignored and do not trigger the
join.</li>
<li>Input records for the
table with a <code class="docutils literal"><span
class="pre">null</span></code> value are interpreted as <em>tombstones</em>,
which indicate the deletion of a record key from the table. Tombstones do not
trigger the
join.</li>
</ul>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 71aac5c6401..1f122b6e366 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -153,6 +153,51 @@
as upper and lower bound (with semantics "no bound") to simplify the
usage of the <code>RangeQuery</code> class.
</p>
+ <p>
+ The non-null key requirements for Kafka Streams join operators were
relaxed as part of <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams">KIP-962</a>.
+ The behavior of the following operators changed.
+ <ul>
+ <li>left join KStream-KStream: no longer drop left records with
null-key and call ValueJoiner with 'null' for right value.</li>
+ <li>outer join KStream-KStream: no longer drop left/right records with
null-key and call ValueJoiner with 'null' for right/left value.</li>
+ <li>left-foreign-key join KTable-KTable: no longer drop left records
with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner
with 'null' for right value.</li>
+ <li>left join KStream-KTable: no longer drop left records with
null-key and call ValueJoiner with 'null' for right value.</li>
+ <li>left join KStream-GlobalTable: no longer drop records when
KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right
value.</li>
+ </ul>
+ Stream-DSL users who want to keep the current behavior can prepend a
.filter() operator to the aforementioned operators and filter accordingly.
+ The following snippets illustrate how to keep the old behavior.
+ <pre>
+ <code class="java">
+ //left join KStream-KStream
+ leftStream
+ .filter((key, value) -> key != null)
+ .leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue,
rightValue), windows);
+
+ //outer join KStream-KStream
+ rightStream
+ .filter((key, value) -> key != null);
+ leftStream
+ .filter((key, value) -> key != null)
+ .outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue,
rightValue), windows);
+
+ //left-foreign-key join KTable-KTable
+ Function<String, String> foreignKeyExtractor = leftValue -> ...
+ leftTable
+ .filter((key, value) -> foreignKeyExtractor.apply(value) != null)
+ .leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue)
-> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
+
+ //left join KStream-KTable
+ leftStream
+ .filter((key, value) -> key != null)
+ .leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue,
rightValue));
+
+ //left join KStream-GlobalTable
+ KeyValueMapper<String, String, String> keyValueMapper = (key,
value) -> ...;
+ leftStream
+ .filter((key, value) -> keyValueMapper.apply(key,value) != null)
+ .leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) ->
join(leftValue, rightValue));
+ </code>
+ </pre>
+ </p>
<h3><a id="streams_api_changes_350"
href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0ea7b78764d..b8870620e90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1508,7 +1508,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of this {@code KStream} that does
not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the
other stream.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -1588,7 +1588,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of this {@code KStream} that does
not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for
the other stream.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -1669,7 +1669,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of this {@code KStream} that does
not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the
other stream.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -1754,7 +1754,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of this {@code KStream} that does
not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for
the other stream.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -1837,7 +1837,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of both {@code KStream}s that does
not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the
this/other stream, respectively.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -1918,7 +1918,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of both {@code KStream}s that does
not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for
the this/other stream, respectively.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -2086,7 +2086,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input
records.
* Furthermore, for each input record of both {@code KStream}s that does
not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for
this/other stream, respectively.
- * If an input record key or value is {@code null} the record will not be
included in the join operation and thus no
+ * If an input record value is {@code null} the record will not be
included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
@@ -2484,7 +2484,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
* <p>
* Example:
@@ -2564,7 +2564,7 @@ public interface KStream<K, V> {
* If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input
records.
* Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
* <p>
* Example:
@@ -2643,7 +2643,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
* <p>
* Example:
@@ -2726,7 +2726,7 @@ public interface KStream<K, V> {
* If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input
records.
* Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
* <p>
* Example:
@@ -2805,8 +2805,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in
{@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* The key of the result record is the same as the key of this {@code
KStream}.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this
stream
* @param keySelector instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
@@ -2837,8 +2837,8 @@ public interface KStream<K, V> {
* {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code
KStream}.
* Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this
stream
* @param keySelector instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
@@ -2868,8 +2868,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in
{@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* The key of the result record is the same as the key of this {@code
KStream}.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this
stream
* @param keySelector instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
@@ -2902,8 +2902,8 @@ public interface KStream<K, V> {
* {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code
KStream}.
* Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
*
* @param globalTable the {@link GlobalKTable} to be joined with this
stream
* @param keySelector instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
@@ -2937,8 +2937,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding
record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with
arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
* {@link ValueJoiner}.
*
@@ -2973,8 +2973,8 @@ public interface KStream<K, V> {
* provided {@link ValueJoinerWithKey} will be called to compute a value
(with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
* {@link ValueJoiner}.
*
@@ -3008,8 +3008,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding
record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with
arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
* {@link ValueJoiner}.
*
@@ -3045,8 +3045,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding
record in {@link GlobalKTable} the
* provided {@link ValueJoinerWithKey} will be called to compute a value
(with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
- * If a {@code KStream} input value is {@code null} or if {@code
keyValueMapper} returns {@code null} the record
- * will not be included in the join operation and thus no output record
will be added to the resulting {@code KStream}.
+ * If a {@code KStream} input value is {@code null} the record will not be
included in the join operation
+ * and thus no output record will be added to the resulting {@code
KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
* {@link ValueJoinerWithKey}.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index d928f5b92d3..4959cd50241 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -28,7 +28,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
+import
org.apache.kafka.streams.kstream.internals.graph.NodesWithRelaxedNullKeyJoinDownstream;
import
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
@@ -351,8 +353,23 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
LOG.debug("Optimizing the Kafka Streams graph for self-joins");
rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
}
+ LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+ rewriteRepartitionNodes();
}
+ private void rewriteRepartitionNodes() {
+ final Set<BaseRepartitionNode<?, ?>> nodes = new
NodesWithRelaxedNullKeyJoinDownstream(root).find();
+ for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
+ if (partitionNode.getProcessorParameters() != null) {
+ partitionNode.setProcessorParameters(new ProcessorParameters<>(
+ new KStreamFilter<>((k, v) -> k != null, false),
+ partitionNode.getProcessorParameters().processorName()
+ ));
+ }
+ }
+ }
+
+
private void mergeDuplicateSourceNodes() {
final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new
HashMap<>();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ceec1ebd7de..f656702bea4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1041,9 +1041,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
nullKeyFilterProcessorName = repartitionTopicName + "-filter";
}
- final Predicate<K1, V1> notNullKeyPredicate = (k, v) -> k != null;
final ProcessorParameters<K1, V1, ?, ?> processorParameters = new
ProcessorParameters<>(
- new KStreamFilter<>(notNullKeyPredicate, false),
+ new KStreamFilter<>((k, v) -> true, false),
nullKeyFilterProcessorName
);
@@ -1231,6 +1230,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
final StreamTableJoinNode<K, V> streamTableJoinNode =
new StreamTableJoinNode<>(name, processorParameters, new String[]
{}, null, null, Optional.empty());
+ if (leftJoin) {
+
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+ }
builder.addGraphNode(graphNode, streamTableJoinNode);
// do not have serde for joined result
@@ -1287,6 +1289,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
);
builder.addGraphNode(graphNode, streamTableJoinNode);
+ if (leftJoin) {
+
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+ }
// do not have serde for joined result
return new KStreamImpl<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 8e0fcfece0e..850677c7cb0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -230,6 +230,9 @@ class KStreamImplJoin {
final GraphNode joinGraphNode = joinBuilder.build();
+ if (leftOuter || rightOuter) {
+ joinGraphNode.addLabel(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+ }
builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode),
joinGraphNode);
final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K, V1>)
lhs).subTopologySourceNodes);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 317943a3c34..13cfa0db29d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -51,8 +51,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K,
V, K, V> {
public void process(final Record<K, V> record) {
// if the key is null, we do not need to put the record into
window store
// since it will never be considered for join operations
+ context().forward(record);
if (record.key() != null) {
- context().forward(record);
// Every record basically starts a new window. We're using a
window store mostly for the retention.
window.put(record.key(), record.value(), record.timestamp());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 1fd74e74647..603e1e82550 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
@@ -32,9 +33,8 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,17 +124,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
- if (StreamStreamJoinUtil.skipRecord(record, LOG,
droppedRecordsSensor, context())) {
- return;
- }
- boolean needOuterJoin = outer;
-
final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp -
joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp +
joinAfterMs);
-
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+ if (outer && record.key() == null && record.value() != null) {
+ context().forward(record.withValue(joiner.apply(record.key(),
record.value(), null)));
+ return;
+ } else if (StreamStreamJoinUtil.skipRecord(record, LOG,
droppedRecordsSensor, context())) {
+ return;
+ }
+
+ boolean needOuterJoin = outer;
// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store ->
emitNonJoinedOuterRecords(store, record));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index af312665c7b..ddb171310a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -125,15 +125,20 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
@SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
- final ValueAndTimestamp<V2> valueAndTimestamp2 =
valueGetter.isVersioned()
- ? valueGetter.get(mappedKey, record.timestamp())
- : valueGetter.get(mappedKey);
- final V2 value2 = getValueOrNull(valueAndTimestamp2);
+ final V2 value2 = getValue2(record, mappedKey);
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(),
record.value(), value2)));
}
}
+ private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
+ if (mappedKey == null) return null;
+ final ValueAndTimestamp<V2> valueAndTimestamp =
valueGetter.isVersioned()
+ ? valueGetter.get(mappedKey, record.timestamp())
+ : valueGetter.get(mappedKey);
+ return getValueOrNull(valueAndTimestamp);
+ }
+
private boolean maybeDropRecord(final Record<K1, V1> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper}
returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is
the same as having a null key
@@ -144,6 +149,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
// furthermore, on left/outer joins 'null' in ValueJoiner#apply()
indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null
values are ignored
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+ if (leftJoin && record.key() == null && record.value() != null) {
+ return false;
+ }
if (mappedKey == null || record.value() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata =
context().recordMetadata().get();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 533d8200a1b..dc6f289b136 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -30,9 +30,9 @@ public abstract class BaseRepartitionNode<K, V> extends
GraphNode {
protected final String sinkName;
protected final String sourceName;
protected final String repartitionTopic;
- protected final ProcessorParameters<K, V, ?, ?> processorParameters;
protected final StreamPartitioner<K, V> partitioner;
protected final InternalTopicProperties internalTopicProperties;
+ protected ProcessorParameters<K, V, ?, ?> processorParameters;
BaseRepartitionNode(final String nodeName,
final String sourceName,
@@ -72,6 +72,14 @@ public abstract class BaseRepartitionNode<K, V> extends
GraphNode {
return keySerde != null ? keySerde.deserializer() : null;
}
+ public void setProcessorParameters(final ProcessorParameters<K, V, ?, ?>
processorParameters) {
+ this.processorParameters = processorParameters;
+ }
+
+ public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
+ return processorParameters;
+ }
+
@Override
public String toString() {
return "BaseRepartitionNode{" +
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
index 98fb98e2536..108fa98adaa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
+import java.util.LinkedList;
import java.util.Optional;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -29,6 +30,8 @@ public abstract class GraphNode {
private final Collection<GraphNode> childNodes = new LinkedHashSet<>();
private final Collection<GraphNode> parentNodes = new LinkedHashSet<>();
+
+ private final Collection<Label> labels = new LinkedList<>();
private final String nodeName;
private boolean keyChangingOperation;
private boolean valueChangingOperation;
@@ -152,4 +155,16 @@ public abstract class GraphNode {
", mergeNode=" + mergeNode +
", parentNodes=" + Arrays.toString(parentNames) + '}';
}
+
+ public void addLabel(final Label label) {
+ labels.add(label);
+ }
+
+ public Collection<Label> labels() {
+ return labels;
+ }
+
+ public enum Label {
+ NULL_KEY_RELAXED_JOIN
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
new file mode 100644
index 00000000000..0cbed194450
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class NodesWithRelaxedNullKeyJoinDownstream {
+
+ private final HashSet<GraphNode> visited;
+ private final HashSet<GraphNode> nonOptimizable;
+ private final GraphNode start;
+
+ public NodesWithRelaxedNullKeyJoinDownstream(final GraphNode root) {
+ this.start = root;
+ this.visited = new HashSet<>();
+ this.nonOptimizable = new HashSet<>();
+ }
+
+ public Set<BaseRepartitionNode<?, ?>> find() {
+ traverseGraph(this.start);
+ return visited.stream()
+ .filter(node -> node instanceof BaseRepartitionNode &&
!nonOptimizable.contains(node))
+ .map(node -> (BaseRepartitionNode<?, ?>) node)
+ .collect(Collectors.toSet());
+ }
+
+ private void traverseGraph(final GraphNode node) {
+ if (!visited.contains(node)) {
+ for (final GraphNode child : node.children()) {
+ traverseGraph(child);
+ if
(child.labels().contains(GraphNode.Label.NULL_KEY_RELAXED_JOIN) ||
nonOptimizable.contains(child)) {
+ nonOptimizable.add(node);
+ }
+ }
+ visited.add(node);
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index ff4d2e69e1a..8d6e2d61c56 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -133,7 +133,9 @@ public abstract class AbstractJoinIntegrationTest {
new Input<>(INPUT_TOPIC_LEFT, null, 12),
new Input<>(INPUT_TOPIC_RIGHT, null, 13),
new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
- new Input<>(INPUT_TOPIC_LEFT, "D", 15)
+ new Input<>(INPUT_TOPIC_LEFT, "D", 15),
+ new Input<>(INPUT_TOPIC_LEFT, null, "E", 16),
+ new Input<>(INPUT_TOPIC_RIGHT, null, "e", 17)
);
// used for stream-stream self joins where only one input topic is needed
@@ -299,5 +301,11 @@ public abstract class AbstractJoinIntegrationTest {
record = KeyValue.pair(ANY_UNIQUE_KEY, value);
this.timestamp = timestamp;
}
+
+ Input(final String topic, final Long key, final V value, final long
timestamp) {
+ this.topic = topic;
+ record = KeyValue.pair(key, value);
+ this.timestamp = timestamp;
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
index cc55e5711ed..c6cb077e6c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -117,6 +117,7 @@ public class KStreamKStreamIntegrationTest {
expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
+ expected.add(new KeyValue<>(null, "value1=left-5a,value2=null"));
verifyKStreamKStreamOuterJoin(expected);
}
@@ -135,7 +136,8 @@ public class KStreamKStreamIntegrationTest {
new KeyValue<>("Key-1", "left-1a"),
new KeyValue<>("Key-2", "left-2a"),
new KeyValue<>("Key-3", "left-3a"),
- new KeyValue<>("Key-4", "left-4a")
+ new KeyValue<>("Key-4", "left-4a"),
+ new KeyValue<>(null, "left-5a")
);
final List<KeyValue<String, String>> left2 = asList(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index f086b720e93..61f9cde5974 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -265,8 +265,11 @@ public class KStreamRepartitionIntegrationTest {
new KeyValue<>(2, "B")
);
- sendEvents(timestamp, expectedRecords);
- sendEvents(topicB, timestamp, expectedRecords);
+ final List<KeyValue<Integer, String>> recordsToSend = new
ArrayList<>(expectedRecords);
+ recordsToSend.add(new KeyValue<>(null, "C"));
+
+ sendEvents(timestamp, recordsToSend);
+ sendEvents(topicB, timestamp, recordsToSend);
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
new file mode 100644
index 00000000000..54da72208a4
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static
org.apache.kafka.streams.kstream.JoinWindows.ofTimeDifferenceAndGrace;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RelaxedNullKeyRequirementJoinTest {
+
+ private static final JoinWindows WINDOW =
ofTimeDifferenceAndGrace(Duration.ofSeconds(60), Duration.ofSeconds(10));
+ private static final ValueJoiner<String, String, String> JOINER = (lv, rv)
-> lv + "|" + rv;
+ private static final String LEFT = "left";
+ private static final String RIGHT = "right";
+ private static final String OUT = "out";
+ private TopologyTestDriver testDriver;
+ private StreamsBuilder builder;
+ private KStream<String, String> leftStream;
+ private KStream<String, String> rightStream;
+ private TestInputTopic<String, String> left;
+ private TestInputTopic<String, String> right;
+ private TestOutputTopic<String, String> out;
+
+ @BeforeEach
+ void beforeEach() {
+ builder = new StreamsBuilder();
+ leftStream = builder.<String, String>stream(LEFT).repartition();
+ rightStream = builder.<String, String>stream(RIGHT).repartition();
+ }
+
+ @AfterEach
+ void afterEach() {
+ testDriver.close();
+ }
+
+ @Test
+ void testRelaxedLeftStreamStreamJoin() {
+ leftStream
+ .leftJoin(rightStream, JOINER, WINDOW)
+ .to(OUT);
+ initTopology();
+ left.pipeInput(null, "leftValue", 1);
+ assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
+ }
+
+ @Test
+ void testRelaxedLeftStreamTableJoin() {
+ leftStream
+ .leftJoin(rightStream.toTable(), JOINER)
+ .to(OUT);
+ initTopology();
+ left.pipeInput(null, "leftValue", 1);
+ assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
+ }
+
+ @Test
+ void testRelaxedOuterStreamStreamJoin() {
+ leftStream
+ .outerJoin(rightStream, JOINER, WINDOW)
+ .to(OUT);
+ initTopology();
+ right.pipeInput(null, "rightValue", 1);
+ left.pipeInput(null, "leftValue");
+ assertEquals(
+ Arrays.asList(new KeyValue<>(null, "null|rightValue"), new
KeyValue<>(null, "leftValue|null")),
+ out.readKeyValuesToList()
+ );
+ }
+
+ @Test
+ void testRelaxedLeftStreamGlobalTableJoin() {
+ final GlobalKTable<String, String> global =
builder.globalTable("global");
+ leftStream
+ .leftJoin(global, (key, value) -> null, JOINER)
+ .to(OUT);
+ initTopology();
+ left.pipeInput(null, "leftValue", 1);
+ assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
+ }
+
+ @Test
+ void
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream() {
+ leftStream
+ .repartition()
+ .to(OUT);
+ initTopology();
+ left.pipeInput(null, "leftValue", 1);
+ assertEquals(Collections.<KeyValue<String, String>>emptyList(),
out.readKeyValuesToList());
+ }
+
+ private void initTopology() {
+ testDriver = new TopologyTestDriver(builder.build(), props());
+ left = testDriver.createInputTopic(
+ LEFT,
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ right = testDriver.createInputTopic(
+ RIGHT,
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ out = testDriver.createOutputTopic(
+ OUT,
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ }
+
+ private static Properties props() {
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+ return props;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 554c5692a96..b1d2390dc83 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -137,7 +137,9 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ null,
+ null
);
leftStream.join(
@@ -182,7 +184,9 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ null,
+ null
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -229,7 +233,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ Arrays.asList(
+ new TestRecord<>(null, "E-null", null, 16L)),
+ null
);
leftStream.leftJoin(
@@ -274,7 +281,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ Arrays.asList(
+ new TestRecord<>(null, "E-null", null, 16L)),
+ null
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -321,7 +331,11 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ Arrays.asList(
+ new TestRecord<>(null, "E-null", null, 16L)),
+ Arrays.asList(
+ new TestRecord<>(null, "null-e", null, 17L))
);
leftStream.outerJoin(
@@ -366,7 +380,11 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+ Arrays.asList(
+ new TestRecord<>(null, "E-null", null, 16L)),
+ Arrays.asList(
+ new TestRecord<>(null, "null-e", null, 17L))
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -461,7 +479,9 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
- new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
+ null,
+ null
);
leftStream.join(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 1d17cbb53a2..24f4dc1face 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -620,7 +620,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -662,7 +664,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -707,7 +711,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
null,
// incorrect result `null-d` is caused by self-join of
`rightTable`
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"null-d", null, 14L)),
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -749,7 +755,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -793,7 +801,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
null, null, 12L)),
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -837,7 +847,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
null, null, 12L)),
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"null-d", null, 14L)),
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -881,7 +893,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -927,7 +941,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
@@ -975,7 +991,9 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"D-d-d", null, 15L)),
+ null,
+ null
);
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult,
storeName);
}