This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a958c6cb12 Kafka-14748: Relax non-null FK left-join requirement 
(#14107)
4a958c6cb12 is described below

commit 4a958c6cb12392746b100431f2ff8ab3c5f53b1a
Author: Florin Akermann <[email protected]>
AuthorDate: Wed Dec 6 01:04:32 2023 +0100

    Kafka-14748: Relax non-null FK left-join requirement (#14107)
    
    Relax non-null FK left-join requirement.
    
    Testing Strategy: Inject extractor which returns null on first or second 
element.
    
    Reviewers: Walker Carlson <[email protected]>
---
 docs/streams/developer-guide/dsl-api.html          |   4 -
 docs/streams/upgrade-guide.html                    |  31 +++---
 .../org/apache/kafka/streams/kstream/KTable.java   |  21 ++--
 .../internals/foreignkeyjoin/CombinedKey.java      |   1 -
 .../SubscriptionJoinProcessorSupplier.java         |   5 +-
 .../SubscriptionReceiveProcessorSupplier.java      |  56 ++++++----
 .../SubscriptionSendProcessorSupplier.java         | 122 ++++++++++-----------
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 103 ++++++++++++++++-
 8 files changed, 224 insertions(+), 119 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 88f7d6f4ae2..cc729ee25c2 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -2542,10 +2542,6 @@ Function&lt;Long, Long&gt; foreignKeyExtractor = (x) 
-&gt; x;
                                   <blockquote>
                                     <div>
                                       <ul class="simple">
-                                      <li>
-                                            Records for which the <code 
class="docutils literal"><span class="pre">foreignKeyExtractor</span></code> 
produces <code class="docutils literal"><span class="pre">null</span></code> 
are ignored and do not trigger a join.
-                                            If you want to join with <code 
class="docutils literal"><span class="pre">null</span></code> foreign keys, use 
a suitable sentinel value to do so (i.e. <code class="docutils literal"><span 
class="pre">"NULL"</span></code> for a String field, or <code class="docutils 
literal"><span class="pre">-1</span></code> for an auto-incrementing integer 
field).
-                                        </li>
                                         <li>Input records with a <code 
class="docutils
                                             literal"><span 
class="pre">null</span></code>
                                           value are interpreted as 
<em>tombstones</em>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1f122b6e366..4f905a98d1d 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -136,21 +136,7 @@
     <h3><a id="streams_api_changes_370" 
href="#streams_api_changes_370">Streams API changes in 3.7.0</a></h3>
     <p>
         IQv2 supports <code>RangeQuery</code> that allows to specify 
unbounded, bounded, or half-open key-ranges, which return data in ascending 
(byte[]-lexicographical) order (per partition).
-        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2";>KIP-985</a>
 extends this functionality by adding <code>.withDescendingKeys()<code> to 
allow user to receive data in descending order.
-    </p>
-
-    <h3><a id="streams_api_changes_360" 
href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
-    <p>
-      Rack aware task assignment was introduced in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams";>KIP-925</a>.
-      Rack aware task assignment can be enabled for 
<code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to 
compute task assignments which can minimize cross rack traffic under certain 
conditions.
-      For more information, including how it can be enabled and further 
configured, see the <a 
href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka
 Streams Developer Guide</b></a>.
-    </p>
-
-    <p>
-      IQv2 supports a <code>RangeQuery</code> that allows to specify 
unbounded, bounded, or half-open key-ranges. Users have to use 
<code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
-      or <code>withNoBounds()</code> to specify half-open or unbounded ranges, 
but cannot use <code>withRange(K lower, K upper)</code> for the same.
-      <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds";>KIP-941</a>
 closes this gap by allowing to pass in <code>null</code>
-      as upper and lower bound (with semantics "no bound") to simplify the 
usage of the <code>RangeQuery</code> class.
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2";>KIP-985</a>
 extends this functionality by adding <code>.withDescendingKeys()</code> to 
allow user to receive data in descending order.
     </p>
 
     <p>
@@ -198,6 +184,21 @@
     </code>
     </pre>
     </p>
+
+    <h3><a id="streams_api_changes_360" 
href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
+    <p>
+      Rack aware task assignment was introduced in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams";>KIP-925</a>.
+      Rack aware task assignment can be enabled for 
<code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to 
compute task assignments which can minimize cross rack traffic under certain 
conditions.
+      For more information, including how it can be enabled and further 
configured, see the <a 
href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka
 Streams Developer Guide</b></a>.
+    </p>
+
+    <p>
+      IQv2 supports a <code>RangeQuery</code> that allows to specify 
unbounded, bounded, or half-open key-ranges. Users have to use 
<code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
+      or <code>withNoBounds()</code> to specify half-open or unbounded ranges, 
but cannot use <code>withRange(K lower, K upper)</code> for the same.
+      <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds";>KIP-941</a>
 closes this gap by allowing to pass in <code>null</code>
+      as upper and lower bound (with semantics "no bound") to simplify the 
usage of the <code>RangeQuery</code> class.
+    </p>
+
     <h3><a id="streams_api_changes_350" 
href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
     <p>      
       A new state store type, versioned key-value stores, was introduced in
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index e530769be4f..0222da70340 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -2237,7 +2237,7 @@ public interface KTable<K, V> {
      *
      * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
      * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
-     *                            result is null, the update is ignored as 
invalid.
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param <VR>                the value type of the result {@code KTable}
      * @param <KO>                the key type of the other {@code KTable}
@@ -2254,8 +2254,8 @@ public interface KTable<K, V> {
      * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
      *
      * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V) If the
-     *                            result is null, the update is ignored as 
invalid.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param named               a {@link Named} config used to name the 
processor in the topology
      * @param <VR>                the value type of the result {@code KTable}
@@ -2279,9 +2279,8 @@ public interface KTable<K, V> {
      * <p>
      * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
      *
-     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V) If the
-     *                            result is null, the update is ignored as 
invalid.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
      * @param <VR>                the value type of the result {@code KTable}
@@ -2301,7 +2300,7 @@ public interface KTable<K, V> {
      *
      * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
      * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
-     *                            result is null, the update is ignored as 
invalid.
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
      *                            should be materialized. Cannot be {@code 
null}
@@ -2321,8 +2320,8 @@ public interface KTable<K, V> {
      * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
      *
      * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V) If the
-     *                            result is null, the update is ignored as 
invalid.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param named               a {@link Named} config used to name the 
processor in the topology
      * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
@@ -2350,8 +2349,8 @@ public interface KTable<K, V> {
      * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
      *
      * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
-     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V) If the
-     *                            result is null, the update is ignored as 
invalid.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key 
(KO) from this table's value (V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
      * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
      * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
      * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index ae5f20fe533..01e94da345a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -24,7 +24,6 @@ public class CombinedKey<KF, KP> {
     private final KP primaryKey;
 
     CombinedKey(final KF foreignKey, final KP primaryKey) {
-        Objects.requireNonNull(foreignKey, "foreignKey can't be null");
         Objects.requireNonNull(primaryKey, "primaryKey can't be null");
         this.foreignKey = foreignKey;
         this.primaryKey = primaryKey;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 7d31ef44223..a8677ce2958 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -77,7 +77,10 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                     throw new UnsupportedVersionException("SubscriptionWrapper 
is of an incompatible version.");
                 }
 
-                final ValueAndTimestamp<VO> foreignValueAndTime = 
foreignValues.get(record.key().getForeignKey());
+                final ValueAndTimestamp<VO> foreignValueAndTime =
+                    record.key().getForeignKey() == null ?
+                        null :
+                        foreignValues.get(record.key().getForeignKey());
 
                 final long resultTimestamp =
                     foreignValueAndTime == null ?
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index cf88aec6f9a..90d70bdf330 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -75,20 +75,8 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
 
             @Override
             public void process(final Record<KO, SubscriptionWrapper<K>> 
record) {
-                if (record.key() == null) {
-                    if (context().recordMetadata().isPresent()) {
-                        final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                        LOG.warn(
-                            "Skipping record due to null foreign key. "
-                                + "topic=[{}] partition=[{}] offset=[{}]",
-                            recordMetadata.topic(), 
recordMetadata.partition(), recordMetadata.offset()
-                        );
-                    } else {
-                        LOG.warn(
-                            "Skipping record due to null foreign key. Topic, 
partition, and offset not known."
-                        );
-                    }
-                    droppedRecordsSensor.record();
+                if (record.key() == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction()))
 {
+                    dropRecord();
                     return;
                 }
                 if (record.value().getVersion() > 
SubscriptionWrapper.CURRENT_VERSION) {
@@ -97,7 +85,22 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
                     //from older SubscriptionWrapper versions to newer 
versions.
                     throw new UnsupportedVersionException("SubscriptionWrapper 
is of an incompatible version.");
                 }
+                context().forward(
+                    record.withKey(new CombinedKey<>(record.key(), 
record.value().getPrimaryKey()))
+                        .withValue(inferChange(record))
+                        .withTimestamp(record.timestamp())
+                );
+            }
 
+            private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> 
inferChange(final Record<KO, SubscriptionWrapper<K>> record) {
+                if (record.key() == null) {
+                    return new Change<>(ValueAndTimestamp.make(record.value(), 
record.timestamp()), null);
+                } else {
+                    return inferBasedOnState(record);
+                }
+            }
+
+            private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> 
inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
                 final Bytes subscriptionKey = keySchema.toBytes(record.key(), 
record.value().getPrimaryKey());
 
                 final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = 
ValueAndTimestamp.make(record.value(), record.timestamp());
@@ -110,14 +113,23 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
                 } else {
                     store.put(subscriptionKey, newValue);
                 }
-                final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change 
= new Change<>(newValue, oldValue);
-                // note: key is non-nullable
-                // note: newValue is non-nullable
-                context().forward(
-                    record.withKey(new CombinedKey<>(record.key(), 
record.value().getPrimaryKey()))
-                        .withValue(change)
-                        .withTimestamp(newValue.timestamp())
-                );
+                return new Change<>(newValue, oldValue);
+            }
+
+            private void dropRecord() {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null foreign key. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null foreign key. Topic, 
partition, and offset not known."
+                    );
+                }
+                droppedRecordsSensor.record();
             }
         };
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 8a6298c28de..09bd35339ab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
 import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
 import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
 import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
@@ -86,6 +87,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
         private Sensor droppedRecordsSensor;
         private String foreignKeySerdeTopic;
         private String valueSerdeTopic;
+        private long[] recordHash;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -109,108 +111,102 @@ public class SubscriptionSendProcessorSupplier<K, KO, 
V> implements ProcessorSup
 
         @Override
         public void process(final Record<K, Change<V>> record) {
+            // clear cashed hash from previous record
+            recordHash = null;
             // drop out-of-order records from versioned tables (cf. KIP-914)
             if (useVersionedSemantics && !record.value().isLatest) {
                 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
                 droppedRecordsSensor.record();
                 return;
             }
+            if (leftJoin) {
+                leftJoinInstructions(record);
+            } else {
+                defaultJoinInstructions(record);
+            }
+        }
 
-            final long[] currentHash = record.value().newValue == null ?
-                null :
-                Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-            final int partition = context().recordMetadata().get().partition();
+        private void leftJoinInstructions(final Record<K, Change<V>> record) {
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                }
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            } else if (record.value().newValue != null) {
+                final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            }
+        }
+
+        private void defaultJoinInstructions(final Record<K, Change<V>> 
record) {
+            if (record.value().oldValue != null) {
+                final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                     return;
                 }
                 if (record.value().newValue != null) {
-                    final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
                     if (newForeignKey == null) {
                         logSkippedRecordDueToNullForeignKey();
                         return;
                     }
-
-                    final byte[] serialOldForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-                    final byte[] serialNewForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-                    if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+                    if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
                         //Different Foreign Key - delete the old key value and 
propagate the new one.
                         //Delete it from the oldKey's state store
-                        context().forward(
-                            record.withKey(oldForeignKey)
-                                .withValue(new SubscriptionWrapper<>(
-                                    currentHash,
-                                    DELETE_KEY_NO_PROPAGATE,
-                                    record.key(),
-                                    partition
-                                )));
-                        //Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-                        //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                        //and LEFT join.
+                        forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
                     }
-                    context().forward(
-                        record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(
-                                currentHash,
-                                PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-                                record.key(),
-                                partition
-                            )));
+                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+                    //and LEFT join.
+                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
                 } else {
-                    //A simple propagatable delete. Delete from the state 
store and propagate the delete onwards.
-                    context().forward(
-                        record.withKey(oldForeignKey)
-                           .withValue(new SubscriptionWrapper<>(
-                               currentHash,
-                               DELETE_KEY_AND_PROPAGATE,
-                               record.key(),
-                               partition
-                           )));
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
             } else if (record.value().newValue != null) {
-                //change.oldValue is null, which means it was deleted at least 
once before, or it is brand new.
-                //In either case, we only need to propagate if the FK_VAL is 
available, as the null from the delete would
-                //have been propagated otherwise.
-
-                final SubscriptionWrapper.Instruction instruction;
-                if (leftJoin) {
-                    //Want to send info even if RHS is null.
-                    instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
-                } else {
-                    instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
-                }
                 final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
                 if (newForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                 } else {
-                    context().forward(
-                        record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(
-                                currentHash,
-                                instruction,
-                                record.key(),
-                                partition)));
+                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
                 }
             }
         }
 
+        private byte[] serialize(final KO key) {
+            return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+        }
+
+        private void forward(final Record<K, Change<V>> record, final KO 
foreignKey, final Instruction deleteKeyNoPropagate) {
+            final SubscriptionWrapper<K> wrapper = new SubscriptionWrapper<>(
+                hash(record),
+                deleteKeyNoPropagate,
+                record.key(),
+                context().recordMetadata().get().partition()
+            );
+            context().forward(record.withKey(foreignKey).withValue(wrapper));
+        }
+
+        private long[] hash(final Record<K, Change<V>> record) {
+            if (recordHash == null) {
+                recordHash = record.value().newValue == null
+                    ? null
+                    : 
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
+            }
+            return recordHash;
+        }
+
         private void logSkippedRecordDueToNullForeignKey() {
             if (context().recordMetadata().isPresent()) {
                 final RecordMetadata recordMetadata = 
context().recordMetadata().get();
                 LOG.warn(
-                    "Skipping record due to null foreign key. "
-                        + "topic=[{}] partition=[{}] offset=[{}]",
+                    "Skipping record due to null foreign key. topic=[{}] 
partition=[{}] offset=[{}]",
                     recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
                 );
             } else {
-                LOG.warn(
-                    "Skipping record due to null foreign key. Topic, 
partition, and offset not known."
-                );
+                LOG.warn("Skipping record due to null foreign key. Topic, 
partition, and offset not known.");
             }
             droppedRecordsSensor.record();
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b76b5ddc0db..2bb7b6ea190 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,6 +33,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
@@ -41,11 +41,14 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Assertions;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -646,6 +649,85 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldEmitRecordOnNullForeignKeyForLeftJoins() {
+        final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, true, rejoin, leftVersioned, rightVersioned, value -> null);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
+            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+                );
+                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                if (materialized) {
+                    assertThat(asMap(store), is(expected));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordWhenOldAndNewFkDiffer() {
+        final Function<String, String> foreignKeyExtractor = value -> {
+            final String split = value.split("\\|")[1];
+            if (split.equals("returnNull")) {
+                //new fk
+                return null;
+            } else {
+                //old fk
+                return split;
+            }
+        };
+        final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, true, rejoin, leftVersioned, rightVersioned, 
foreignKeyExtractor);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
+            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final String subscriptionStoreName = 
driver.getAllStateStores().entrySet().stream()
+                .filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE"))
+                .findAny().orElseThrow(() -> new RuntimeException("couldn't 
find store")).getKey();
+            final KeyValueStore<Bytes, ValueAndTimestamp<String>> 
subscriptionStore = driver.getKeyValueStore(subscriptionStoreName);
+            final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+                );
+                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                if (materialized) {
+                    assertThat(asMap(store), is(expected));
+                }
+                Assertions.assertNotNull(subscriptionStore.get(key));
+            }
+            left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp);
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|returnNull,null)")
+                );
+                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                if (materialized) {
+                    assertThat(asMap(store), is(expected));
+                }
+                Assertions.assertNull(subscriptionStore.get(key));
+            }
+        }
+    }
+
+    private static Bytes subscriptionStoreKey(final String lhs, final String 
rhs) {
+        final byte[] lhs1bytes = lhs.getBytes();
+        final byte[] rhs1bytes = rhs.getBytes();
+        final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + 
lhs1bytes.length + rhs1bytes.length);
+        buf.putInt(rhs1bytes.length);
+        buf.put(rhs1bytes);
+        buf.put(lhs1bytes);
+        final Bytes key = Bytes.wrap(buf.array());
+        return key;
+    }
+
     protected static Map<String, String> asMap(final KeyValueStore<String, 
String> store) {
         final HashMap<String, String> result = new HashMap<>();
         store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
@@ -658,6 +740,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                           final boolean rejoin,
                                           final boolean leftVersioned,
                                           final boolean rightVersioned) {
+        return getTopology(
+            streamsConfig,
+            queryableStoreName,
+            leftJoin,
+            rejoin,
+            leftVersioned,
+            rightVersioned,
+            value -> value.split("\\|")[1]
+        );
+    }
+
+    protected static Topology getTopology(final Properties streamsConfig,
+                                          final String queryableStoreName,
+                                          final boolean leftJoin,
+                                          final boolean rejoin,
+                                          final boolean leftVersioned,
+                                          final boolean rightVersioned,
+                                          final Function<String, String> 
extractor) {
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -693,7 +793,6 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             );
         }
 
-        final Function<String, String> extractor = value -> 
value.split("\\|")[1];
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> 
"(" + value1 + "," + value2 + ")";
         final ValueJoiner<String, String, String> rejoiner = rejoin ? (value1, 
value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;
 


Reply via email to