This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5c24974ae2 Kafka 12317:  Relax non-null key requirement in Kafka 
Streams (#14174)
b5c24974ae2 is described below

commit b5c24974ae2967824d1ec34b5b02121399a5e2f9
Author: Florin Akermann <[email protected]>
AuthorDate: Tue Oct 31 17:09:42 2023 +0100

    Kafka 12317:  Relax non-null key requirement in Kafka Streams (#14174)
    
    
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)
    
    The key requirments got relaxed for the followinger streams dsl operator:
    
    left join Kstream-Kstream: no longer drop left records with null-key and 
call ValueJoiner with 'null' for right value.
    outer join Kstream-Kstream: no longer drop left/right records with null-key 
and call ValueJoiner with 'null' for right/left value.
    left-foreign-key join Ktable-Ktable: no longer drop left records with 
null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 
'null' for right value.
    left join KStream-Ktable: no longer drop left records with null-key and 
call ValueJoiner with 'null' for right value.
    left join KStream-GlobalTable: no longer drop records when KeyValueMapper 
returns 'null' and call ValueJoiner with 'null' for right value.
    
    Reviewers: Walker Carlson <[email protected]>
---
 docs/streams/developer-guide/dsl-api.html          |   8 +-
 docs/streams/upgrade-guide.html                    |  45 ++++++
 .../org/apache/kafka/streams/kstream/KStream.java  |  54 ++++----
 .../kstream/internals/InternalStreamsBuilder.java  |  17 +++
 .../streams/kstream/internals/KStreamImpl.java     |   9 +-
 .../streams/kstream/internals/KStreamImplJoin.java |   3 +
 .../kstream/internals/KStreamJoinWindow.java       |   2 +-
 .../kstream/internals/KStreamKStreamJoin.java      |  18 +--
 .../internals/KStreamKTableJoinProcessor.java      |  16 ++-
 .../internals/graph/BaseRepartitionNode.java       |  10 +-
 .../streams/kstream/internals/graph/GraphNode.java |  15 ++
 .../NodesWithRelaxedNullKeyJoinDownstream.java     |  54 ++++++++
 .../integration/AbstractJoinIntegrationTest.java   |  10 +-
 .../integration/KStreamKStreamIntegrationTest.java |   4 +-
 .../KStreamRepartitionIntegrationTest.java         |   7 +-
 .../RelaxedNullKeyRequirementJoinTest.java         | 151 +++++++++++++++++++++
 .../StreamStreamJoinIntegrationTest.java           |  34 ++++-
 .../integration/TableTableJoinIntegrationTest.java |  36 +++--
 18 files changed, 426 insertions(+), 67 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 08bf2ef8cf8..88f7d6f4ae2 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -1893,7 +1893,7 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
                                             join output records.</p>
                                             <blockquote>
                                                 <div><ul class="simple">
-                                                    <li>Input records with a 
<code class="docutils literal"><span class="pre">null</span></code> key or a 
<code class="docutils literal"><span class="pre">null</span></code> value are 
ignored and do not trigger the join.</li>
+                                                    <li>Input records with a 
<code class="docutils literal"><span class="pre">null</span></code> value are 
ignored and do not trigger the join.</li>
                                                 </ul>
                                                 </div></blockquote>
                                         </li>
@@ -1954,7 +1954,7 @@ KStream&lt;String, String&gt; joined = 
left.outerJoin(right,
                                             join output records.</p>
                                             <blockquote>
                                                 <div><ul class="simple">
-                                                    <li>Input records with a 
<code class="docutils literal"><span class="pre">null</span></code> key or a 
<code class="docutils literal"><span class="pre">null</span></code> value are 
ignored and do not trigger the join.</li>
+                                                    <li>Input records with a 
<code class="docutils literal"><span class="pre">null</span></code> value are 
ignored and do not trigger the join.</li>
                                                 </ul>
                                                 </div></blockquote>
                                         </li>
@@ -2894,7 +2894,7 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
                                             <blockquote>
                                                 <div><ul class="simple">
                                                     <li>Only input records for 
the left side (stream) trigger the join.  Input records for the right side 
(table) update only the internal right-side join state.</li>
-                                                    <li>Input records for the 
stream with a <code class="docutils literal"><span 
class="pre">null</span></code> key or a <code class="docutils literal"><span 
class="pre">null</span></code> value are ignored and do not trigger the 
join.</li>
+                                                    <li>Input records for the 
stream with a <code class="docutils literal"><span 
class="pre">null</span></code> value are ignored and do not trigger the 
join.</li>
                                                     <li>Input records for the 
table with a <code class="docutils literal"><span 
class="pre">null</span></code> value are interpreted as <em>tombstones</em> for 
the corresponding key, which indicate the deletion of the key from the table.
                                                         Tombstones do not 
trigger the join.</li>
                                                 </ul>
@@ -3165,7 +3165,7 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
                                             <blockquote>
                                                 <div><ul class="simple">
                                                     <li>Only input records for 
the left side (stream) trigger the join.  Input records for the right side 
(table) update only the internal right-side join state.</li>
-                                                    <li>Input records for the 
stream with a <code class="docutils literal"><span 
class="pre">null</span></code> key or a <code class="docutils literal"><span 
class="pre">null</span></code> value are ignored and do not trigger the 
join.</li>
+                                                    <li>Input records for the 
stream with a <code class="docutils literal"><span 
class="pre">null</span></code> value are ignored and do not trigger the 
join.</li>
                                                     <li>Input records for the 
table with a <code class="docutils literal"><span 
class="pre">null</span></code> value are interpreted as <em>tombstones</em>, 
which indicate the deletion of a record key from the table.  Tombstones do not 
trigger the
                                                         join.</li>
                                                 </ul>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 71aac5c6401..1f122b6e366 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -153,6 +153,51 @@
       as upper and lower bound (with semantics "no bound") to simplify the 
usage of the <code>RangeQuery</code> class.
     </p>
 
+    <p>
+        The non-null key requirements for Kafka Streams join operators were 
relaxed as part of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams";>KIP-962</a>.
+        The behavior of the following operators changed.
+    <ul>
+        <li>left join KStream-KStream: no longer drop left records with 
null-key and call ValueJoiner with 'null' for right value.</li>
+        <li>outer join KStream-KStream: no longer drop left/right records with 
null-key and call ValueJoiner with 'null' for right/left value.</li>
+        <li>left-foreign-key join KTable-KTable: no longer drop left records 
with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner 
with 'null' for right value.</li>
+        <li>left join KStream-KTable: no longer drop left records with 
null-key and call ValueJoiner with 'null' for right value.</li>
+        <li>left join KStream-GlobalTable: no longer drop records when 
KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right 
value.</li>
+    </ul>
+    Stream-DSL users who want to keep the current behavior can prepend a 
.filter() operator to the aforementioned operators and filter accordingly.
+    The following snippets illustrate how to keep the old behavior.
+    <pre>
+    <code class="java">
+            //left join KStream-KStream
+            leftStream
+            .filter((key, value) -> key != null)
+            .leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, 
rightValue), windows);
+
+            //outer join KStream-KStream
+            rightStream
+            .filter((key, value) -> key != null);
+            leftStream
+            .filter((key, value) -> key != null)
+            .outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, 
rightValue), windows);
+
+            //left-foreign-key join KTable-KTable
+            Function&ltString, String&gt foreignKeyExtractor = leftValue -> ...
+            leftTable
+            .filter((key, value) -> foreignKeyExtractor.apply(value) != null)
+            .leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) 
-> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
+
+            //left join KStream-KTable
+            leftStream
+            .filter((key, value) -> key != null)
+            .leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, 
rightValue));
+
+            //left join KStream-GlobalTable
+            KeyValueMapper&ltString, String, String&gt keyValueMapper = (key, 
value) -> ...;
+            leftStream
+            .filter((key, value) -> keyValueMapper.apply(key,value) != null)
+            .leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> 
join(leftValue, rightValue));
+    </code>
+    </pre>
+    </p>
     <h3><a id="streams_api_changes_350" 
href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
     <p>      
       A new state store type, versioned key-value stores, was introduced in
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0ea7b78764d..b8870620e90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1508,7 +1508,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of this {@code KStream} that does 
not satisfy the join predicate the provided
      * {@link ValueJoiner} will be called with a {@code null} value for the 
other stream.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -1588,7 +1588,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of this {@code KStream} that does 
not satisfy the join predicate the provided
      * {@link ValueJoinerWithKey} will be called with a {@code null} value for 
the other stream.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -1669,7 +1669,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of this {@code KStream} that does 
not satisfy the join predicate the provided
      * {@link ValueJoiner} will be called with a {@code null} value for the 
other stream.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -1754,7 +1754,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of this {@code KStream} that does 
not satisfy the join predicate the provided
      * {@link ValueJoinerWithKey} will be called with a {@code null} value for 
the other stream.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -1837,7 +1837,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of both {@code KStream}s that does 
not satisfy the join predicate the provided
      * {@link ValueJoiner} will be called with a {@code null} value for the 
this/other stream, respectively.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -1918,7 +1918,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of both {@code KStream}s that does 
not satisfy the join predicate the provided
      * {@link ValueJoinerWithKey} will be called with a {@code null} value for 
the this/other stream, respectively.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -2086,7 +2086,7 @@ public interface KStream<K, V> {
      * The key of the result record is the same as for both joining input 
records.
      * Furthermore, for each input record of both {@code KStream}s that does 
not satisfy the join predicate the provided
      * {@link ValueJoinerWithKey} will be called with a {@code null} value for 
this/other stream, respectively.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * If an input record value is {@code null} the record will not be 
included in the join operation and thus no
      * output record will be added to the resulting {@code KStream}.
      * <p>
      * Example (assuming all input records belong to the correct windows):
@@ -2484,7 +2484,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
      * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoiner}.
      * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
      * operation and thus no output record will be added to the resulting 
{@code KStream}.
      * <p>
      * Example:
@@ -2564,7 +2564,7 @@ public interface KStream<K, V> {
      * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoinerWithKey}.
      * The key of the result record is the same as for both joining input 
records.
      * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
      * operation and thus no output record will be added to the resulting 
{@code KStream}.
      * <p>
      * Example:
@@ -2643,7 +2643,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
      * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoiner}.
      * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
      * operation and thus no output record will be added to the resulting 
{@code KStream}.
      * <p>
      * Example:
@@ -2726,7 +2726,7 @@ public interface KStream<K, V> {
      * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoinerWithKey}.
      * The key of the result record is the same as for both joining input 
records.
      * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
      * operation and thus no output record will be added to the resulting 
{@code KStream}.
      * <p>
      * Example:
@@ -2805,8 +2805,8 @@ public interface KStream<K, V> {
      * For each {@code KStream} record that finds a corresponding record in 
{@link GlobalKTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
      * The key of the result record is the same as the key of this {@code 
KStream}.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      *
      * @param globalTable    the {@link GlobalKTable} to be joined with this 
stream
      * @param keySelector    instance of {@link KeyValueMapper} used to map 
from the (key, value) of this stream
@@ -2837,8 +2837,8 @@ public interface KStream<K, V> {
      * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
      * The key of the result record is the same as the key of this {@code 
KStream}.
      * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      *
      * @param globalTable    the {@link GlobalKTable} to be joined with this 
stream
      * @param keySelector    instance of {@link KeyValueMapper} used to map 
from the (key, value) of this stream
@@ -2868,8 +2868,8 @@ public interface KStream<K, V> {
      * For each {@code KStream} record that finds a corresponding record in 
{@link GlobalKTable} the provided
      * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
      * The key of the result record is the same as the key of this {@code 
KStream}.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      *
      * @param globalTable    the {@link GlobalKTable} to be joined with this 
stream
      * @param keySelector    instance of {@link KeyValueMapper} used to map 
from the (key, value) of this stream
@@ -2902,8 +2902,8 @@ public interface KStream<K, V> {
      * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
      * The key of the result record is the same as the key of this {@code 
KStream}.
      * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      *
      * @param globalTable    the {@link GlobalKTable} to be joined with this 
stream
      * @param keySelector    instance of {@link KeyValueMapper} used to map 
from the (key, value) of this stream
@@ -2937,8 +2937,8 @@ public interface KStream<K, V> {
      * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link GlobalKTable} the
      * provided {@link ValueJoiner} will be called to compute a value (with 
arbitrary type) for the result record.
      * The key of the result record is the same as this {@code KStream}.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      * If no {@link GlobalKTable} record was found during lookup, a {@code 
null} value will be provided to
      * {@link ValueJoiner}.
      *
@@ -2973,8 +2973,8 @@ public interface KStream<K, V> {
      * provided {@link ValueJoinerWithKey} will be called to compute a value 
(with arbitrary type) for the result record.
      * The key of the result record is the same as this {@code KStream}.
      * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      * If no {@link GlobalKTable} record was found during lookup, a {@code 
null} value will be provided to
      * {@link ValueJoiner}.
      *
@@ -3008,8 +3008,8 @@ public interface KStream<K, V> {
      * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link GlobalKTable} the
      * provided {@link ValueJoiner} will be called to compute a value (with 
arbitrary type) for the result record.
      * The key of the result record is the same as this {@code KStream}.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      * If no {@link GlobalKTable} record was found during lookup, a {@code 
null} value will be provided to
      * {@link ValueJoiner}.
      *
@@ -3045,8 +3045,8 @@ public interface KStream<K, V> {
      * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link GlobalKTable} the
      * provided {@link ValueJoinerWithKey} will be called to compute a value 
(with arbitrary type) for the result record.
      * The key of the result record is the same as this {@code KStream}.
-     * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
-     * will not be included in the join operation and thus no output record 
will be added to the resulting {@code KStream}.
+     * If a {@code KStream} input value is {@code null} the record will not be 
included in the join operation
+     * and thus no output record will be added to the resulting {@code 
KStream}.
      * If no {@link GlobalKTable} record was found during lookup, a {@code 
null} value will be provided to
      * {@link ValueJoinerWithKey}.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index d928f5b92d3..4959cd50241 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -28,7 +28,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.NodesWithRelaxedNullKeyJoinDownstream;
 import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
@@ -351,8 +353,23 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             LOG.debug("Optimizing the Kafka Streams graph for self-joins");
             rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
         }
+        LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+        rewriteRepartitionNodes();
     }
 
+    private void rewriteRepartitionNodes() {
+        final Set<BaseRepartitionNode<?, ?>> nodes = new 
NodesWithRelaxedNullKeyJoinDownstream(root).find();
+        for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
+            if (partitionNode.getProcessorParameters() != null) {
+                partitionNode.setProcessorParameters(new ProcessorParameters<>(
+                    new KStreamFilter<>((k, v) -> k != null, false),
+                    partitionNode.getProcessorParameters().processorName()
+                ));
+            }
+        }
+    }
+
+
     private void mergeDuplicateSourceNodes() {
         final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new 
HashMap<>();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ceec1ebd7de..f656702bea4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1041,9 +1041,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             nullKeyFilterProcessorName = repartitionTopicName + "-filter";
         }
 
-        final Predicate<K1, V1> notNullKeyPredicate = (k, v) -> k != null;
         final ProcessorParameters<K1, V1, ?, ?> processorParameters = new 
ProcessorParameters<>(
-            new KStreamFilter<>(notNullKeyPredicate, false),
+            new KStreamFilter<>((k, v) -> true, false),
             nullKeyFilterProcessorName
         );
 
@@ -1231,6 +1230,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final StreamTableJoinNode<K, V> streamTableJoinNode =
             new StreamTableJoinNode<>(name, processorParameters, new String[] 
{}, null, null, Optional.empty());
 
+        if (leftJoin) {
+            
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+        }
         builder.addGraphNode(graphNode, streamTableJoinNode);
 
         // do not have serde for joined result
@@ -1287,6 +1289,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         );
 
         builder.addGraphNode(graphNode, streamTableJoinNode);
+        if (leftJoin) {
+            
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+        }
 
         // do not have serde for joined result
         return new KStreamImpl<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 8e0fcfece0e..850677c7cb0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -230,6 +230,9 @@ class KStreamImplJoin {
 
         final GraphNode joinGraphNode = joinBuilder.build();
 
+        if (leftOuter || rightOuter) {
+            joinGraphNode.addLabel(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
+        }
         builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode), 
joinGraphNode);
 
         final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K, V1>) 
lhs).subTopologySourceNodes);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 317943a3c34..13cfa0db29d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -51,8 +51,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, 
V, K, V> {
         public void process(final Record<K, V> record) {
             // if the key is null, we do not need to put the record into 
window store
             // since it will never be considered for join operations
+            context().forward(record);
             if (record.key() != null) {
-                context().forward(record);
                 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
                 window.put(record.key(), record.value(), record.timestamp());
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 1fd74e74647..603e1e82550 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
@@ -32,9 +33,8 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,17 +124,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {
+                context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), null)));
+                return;
+            } else if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+                return;
+            }
+
+            boolean needOuterJoin = outer;
             // Emit all non-joined records which window has closed
             if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
                 outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index af312665c7b..ddb171310a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -125,15 +125,20 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
     @SuppressWarnings("unchecked")
     private void doJoin(final Record<K1, V1> record) {
         final K2 mappedKey = keyMapper.apply(record.key(), record.value());
-        final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter.isVersioned()
-            ? valueGetter.get(mappedKey, record.timestamp())
-            : valueGetter.get(mappedKey);
-        final V2 value2 = getValueOrNull(valueAndTimestamp2);
+        final V2 value2 = getValue2(record, mappedKey);
         if (leftJoin || value2 != null) {
             
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
         }
     }
 
+    private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
+        if (mappedKey == null) return null;
+        final ValueAndTimestamp<V2> valueAndTimestamp = 
valueGetter.isVersioned()
+            ? valueGetter.get(mappedKey, record.timestamp())
+            : valueGetter.get(mappedKey);
+        return getValueOrNull(valueAndTimestamp);
+    }
+
     private boolean maybeDropRecord(final Record<K1, V1> record) {
         // we do join iff the join keys are equal, thus, if {@code keyMapper} 
returns {@code null} we
         // cannot join and just ignore the record. Note for KTables, this is 
the same as having a null key
@@ -144,6 +149,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+        if (leftJoin && record.key() == null && record.value() != null) {
+            return false;
+        }
         if (mappedKey == null || record.value() == null) {
             if (context().recordMetadata().isPresent()) {
                 final RecordMetadata recordMetadata = 
context().recordMetadata().get();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 533d8200a1b..dc6f289b136 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -30,9 +30,9 @@ public abstract class BaseRepartitionNode<K, V> extends 
GraphNode {
     protected final String sinkName;
     protected final String sourceName;
     protected final String repartitionTopic;
-    protected final ProcessorParameters<K, V, ?, ?> processorParameters;
     protected final StreamPartitioner<K, V> partitioner;
     protected final InternalTopicProperties internalTopicProperties;
+    protected ProcessorParameters<K, V, ?, ?> processorParameters;
 
     BaseRepartitionNode(final String nodeName,
                         final String sourceName,
@@ -72,6 +72,14 @@ public abstract class BaseRepartitionNode<K, V> extends 
GraphNode {
         return keySerde != null ? keySerde.deserializer() : null;
     }
 
+    public void setProcessorParameters(final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
+        this.processorParameters = processorParameters;
+    }
+
+    public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
+        return processorParameters;
+    }
+
     @Override
     public String toString() {
         return "BaseRepartitionNode{" +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
index 98fb98e2536..108fa98adaa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 
+import java.util.LinkedList;
 import java.util.Optional;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
@@ -29,6 +30,8 @@ public abstract class GraphNode {
 
     private final Collection<GraphNode> childNodes = new LinkedHashSet<>();
     private final Collection<GraphNode> parentNodes = new LinkedHashSet<>();
+
+    private final Collection<Label> labels = new LinkedList<>();
     private final String nodeName;
     private boolean keyChangingOperation;
     private boolean valueChangingOperation;
@@ -152,4 +155,16 @@ public abstract class GraphNode {
                ", mergeNode=" + mergeNode +
                ", parentNodes=" + Arrays.toString(parentNames) + '}';
     }
+
+    public void addLabel(final Label label) {
+        labels.add(label);
+    }
+
+    public Collection<Label> labels() {
+        return labels;
+    }
+
+    public enum Label {
+        NULL_KEY_RELAXED_JOIN
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
new file mode 100644
index 00000000000..0cbed194450
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class NodesWithRelaxedNullKeyJoinDownstream {
+
+    private final HashSet<GraphNode> visited;
+    private final HashSet<GraphNode> nonOptimizable;
+    private final GraphNode start;
+
+    public NodesWithRelaxedNullKeyJoinDownstream(final GraphNode root) {
+        this.start = root;
+        this.visited = new HashSet<>();
+        this.nonOptimizable = new HashSet<>();
+    }
+
+    public Set<BaseRepartitionNode<?, ?>> find() {
+        traverseGraph(this.start);
+        return visited.stream()
+            .filter(node -> node instanceof BaseRepartitionNode && 
!nonOptimizable.contains(node))
+            .map(node -> (BaseRepartitionNode<?, ?>) node)
+            .collect(Collectors.toSet());
+    }
+
+    private void traverseGraph(final GraphNode node) {
+        if (!visited.contains(node)) {
+            for (final GraphNode child : node.children()) {
+                traverseGraph(child);
+                if 
(child.labels().contains(GraphNode.Label.NULL_KEY_RELAXED_JOIN) || 
nonOptimizable.contains(child)) {
+                    nonOptimizable.add(node);
+                }
+            }
+            visited.add(node);
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index ff4d2e69e1a..8d6e2d61c56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -133,7 +133,9 @@ public abstract class AbstractJoinIntegrationTest {
         new Input<>(INPUT_TOPIC_LEFT, null, 12),
         new Input<>(INPUT_TOPIC_RIGHT, null, 13),
         new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
-        new Input<>(INPUT_TOPIC_LEFT, "D", 15)
+        new Input<>(INPUT_TOPIC_LEFT, "D", 15),
+        new Input<>(INPUT_TOPIC_LEFT, null, "E", 16),
+        new Input<>(INPUT_TOPIC_RIGHT, null, "e", 17)
     );
 
     // used for stream-stream self joins where only one input topic is needed
@@ -299,5 +301,11 @@ public abstract class AbstractJoinIntegrationTest {
             record = KeyValue.pair(ANY_UNIQUE_KEY, value);
             this.timestamp = timestamp;
         }
+
+        Input(final String topic, final Long key, final V value, final long 
timestamp) {
+            this.topic = topic;
+            record = KeyValue.pair(key, value);
+            this.timestamp = timestamp;
+        }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
index cc55e5711ed..c6cb077e6c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -117,6 +117,7 @@ public class KStreamKStreamIntegrationTest {
         expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
         expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
         expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
+        expected.add(new KeyValue<>(null, "value1=left-5a,value2=null"));
 
         verifyKStreamKStreamOuterJoin(expected);
     }
@@ -135,7 +136,8 @@ public class KStreamKStreamIntegrationTest {
                 new KeyValue<>("Key-1", "left-1a"),
                 new KeyValue<>("Key-2", "left-2a"),
                 new KeyValue<>("Key-3", "left-3a"),
-                new KeyValue<>("Key-4", "left-4a")
+                new KeyValue<>("Key-4", "left-4a"),
+                new KeyValue<>(null, "left-5a")
         );
 
         final List<KeyValue<String, String>> left2 = asList(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index f086b720e93..61f9cde5974 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -265,8 +265,11 @@ public class KStreamRepartitionIntegrationTest {
             new KeyValue<>(2, "B")
         );
 
-        sendEvents(timestamp, expectedRecords);
-        sendEvents(topicB, timestamp, expectedRecords);
+        final List<KeyValue<Integer, String>> recordsToSend = new 
ArrayList<>(expectedRecords);
+        recordsToSend.add(new KeyValue<>(null, "C"));
+
+        sendEvents(timestamp, recordsToSend);
+        sendEvents(topicB, timestamp, recordsToSend);
 
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
new file mode 100644
index 00000000000..54da72208a4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.kstream.JoinWindows.ofTimeDifferenceAndGrace;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RelaxedNullKeyRequirementJoinTest {
+
+    private static final JoinWindows WINDOW = 
ofTimeDifferenceAndGrace(Duration.ofSeconds(60), Duration.ofSeconds(10));
+    private static final ValueJoiner<String, String, String> JOINER = (lv, rv) 
-> lv + "|" + rv;
+    private static final String LEFT = "left";
+    private static final String RIGHT = "right";
+    private static final String OUT = "out";
+    private TopologyTestDriver testDriver;
+    private StreamsBuilder builder;
+    private KStream<String, String> leftStream;
+    private KStream<String, String> rightStream;
+    private TestInputTopic<String, String> left;
+    private TestInputTopic<String, String> right;
+    private TestOutputTopic<String, String> out;
+
+    @BeforeEach
+    void beforeEach() {
+        builder = new StreamsBuilder();
+        leftStream = builder.<String, String>stream(LEFT).repartition();
+        rightStream = builder.<String, String>stream(RIGHT).repartition();
+    }
+
+    @AfterEach
+    void afterEach() {
+        testDriver.close();
+    }
+
+    @Test
+    void testRelaxedLeftStreamStreamJoin() {
+        leftStream
+            .leftJoin(rightStream, JOINER, WINDOW)
+            .to(OUT);
+        initTopology();
+        left.pipeInput(null, "leftValue", 1);
+        assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+    }
+
+    @Test
+    void testRelaxedLeftStreamTableJoin() {
+        leftStream
+            .leftJoin(rightStream.toTable(), JOINER)
+            .to(OUT);
+        initTopology();
+        left.pipeInput(null, "leftValue", 1);
+        assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+    }
+
+    @Test
+    void testRelaxedOuterStreamStreamJoin() {
+        leftStream
+            .outerJoin(rightStream, JOINER, WINDOW)
+            .to(OUT);
+        initTopology();
+        right.pipeInput(null, "rightValue", 1);
+        left.pipeInput(null, "leftValue");
+        assertEquals(
+            Arrays.asList(new KeyValue<>(null, "null|rightValue"), new 
KeyValue<>(null, "leftValue|null")),
+            out.readKeyValuesToList()
+        );
+    }
+
+    @Test
+    void testRelaxedLeftStreamGlobalTableJoin() {
+        final GlobalKTable<String, String> global = 
builder.globalTable("global");
+        leftStream
+            .leftJoin(global, (key, value) -> null, JOINER)
+            .to(OUT);
+        initTopology();
+        left.pipeInput(null, "leftValue", 1);
+        assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+    }
+
+    @Test
+    void 
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream() {
+        leftStream
+            .repartition()
+            .to(OUT);
+        initTopology();
+        left.pipeInput(null, "leftValue", 1);
+        assertEquals(Collections.<KeyValue<String, String>>emptyList(), 
out.readKeyValuesToList());
+    }
+
+    private void initTopology() {
+        testDriver = new TopologyTestDriver(builder.build(), props());
+        left = testDriver.createInputTopic(
+            LEFT,
+            new StringSerializer(),
+            new StringSerializer()
+        );
+        right = testDriver.createInputTopic(
+            RIGHT,
+            new StringSerializer(),
+            new StringSerializer()
+        );
+        out = testDriver.createOutputTopic(
+            OUT,
+            new StringDeserializer(),
+            new StringDeserializer()
+        );
+    }
+
+    private static Properties props() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        return props;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 554c5692a96..b1d2390dc83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -137,7 +137,9 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            null,
+            null
         );
 
         leftStream.join(
@@ -182,7 +184,9 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            null,
+            null
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -229,7 +233,10 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            Arrays.asList(
+                new TestRecord<>(null, "E-null", null, 16L)),
+            null
         );
 
         leftStream.leftJoin(
@@ -274,7 +281,10 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            Arrays.asList(
+                new TestRecord<>(null, "E-null", null, 16L)),
+            null
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -321,7 +331,11 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            Arrays.asList(
+                new TestRecord<>(null, "E-null", null, 16L)),
+            Arrays.asList(
+                new TestRecord<>(null, "null-e", null, 17L))
         );
 
         leftStream.outerJoin(
@@ -366,7 +380,11 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
+            Arrays.asList(
+                new TestRecord<>(null, "E-null", null, 16L)),
+            Arrays.asList(
+                new TestRecord<>(null, "null-e", null, 17L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -461,7 +479,9 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
+            null,
+            null
         );
 
         leftStream.join(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 1d17cbb53a2..24f4dc1face 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -620,7 +620,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -662,7 +664,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -707,7 +711,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 null,
                 // incorrect result `null-d` is caused by self-join of 
`rightTable`
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"null-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -749,7 +755,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -793,7 +801,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null, 12L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -837,7 +847,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null, 12L)),
                 null,
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"null-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -881,7 +893,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -927,7 +941,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
 
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
@@ -975,7 +991,9 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d-d", null, 15L)),
+                null,
+                null
             );
             runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, 
storeName);
         }

Reply via email to