This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 61587edd79a MINOR: simplify FK-join logic (#20605)
61587edd79a is described below

commit 61587edd79a6688ca2dea4e2b6b98324b0233f27
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Oct 22 08:39:45 2025 -0700

    MINOR: simplify FK-join logic (#20605)
    
    The existing FK join logic is very convoluted due to incremental changes
    and bug-fixes, and thus very hard to understand.
    
    This PR rewrites the logic from scratch to make it easier to
    understanding.
    
    Reviewers: Lucas Brutschy <[email protected]>, Liam
     Clarke-Hutchinson <[email protected]>, Nikita Shupletsov
     <[email protected]>
---
 .../KTableKTableForeignKeyJoinIntegrationTest.java |   5 +-
 .../SubscriptionReceiveProcessorSupplier.java      |   5 +-
 .../SubscriptionSendProcessorSupplier.java         | 122 +++++++++++++++------
 3 files changed, 97 insertions(+), 35 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 256eddd6f74..fa289ca5959 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -891,7 +891,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             rejoin,
             leftVersioned,
             rightVersioned,
-            value -> value.split("\\|")[1]
+            value -> {
+                final String[] tokens = value.split("\\|");
+                return tokens.length == 2 ? tokens[1] : null;
+            }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index e654cd752af..3bb3c7a9396 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -82,7 +82,8 @@ public class SubscriptionReceiveProcessorSupplier<KLeft, 
KRight>
 
             @Override
             public void process(final Record<KRight, 
SubscriptionWrapper<KLeft>> record) {
-                if (record.key() == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
 {
+                final KRight foreignKey = record.key();
+                if (foreignKey == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
 {
                     dropRecord();
                     return;
                 }
@@ -93,7 +94,7 @@ public class SubscriptionReceiveProcessorSupplier<KLeft, 
KRight>
                     throw new UnsupportedVersionException("SubscriptionWrapper 
is of an incompatible version.");
                 }
                 context().forward(
-                    record.withKey(new CombinedKey<>(record.key(), 
record.value().primaryKey()))
+                    record.withKey(new CombinedKey<>(foreignKey, 
record.value().primaryKey()))
                         .withValue(inferChange(record))
                         .withTimestamp(record.timestamp())
                 );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index b161ce092c4..6e57ca4991f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -128,51 +128,109 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
+
+            // for all cases (insert, update, and delete), we send a new 
subscription;
+            // we need to get a response back for all cases to always produce 
a left-join result
+            //
+            // note: for delete, `newForeignKey` is null, what is a "hack"
+            // no actual subscription will be added for null-FK on the right 
hand side, but we still get the response back we need
+            //
+            // this may lead to unnecessary tombstones if the old FK did not 
join;
+            // however, we cannot avoid it as we have no means to know if the 
old FK joined or not
+            forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
         }
 
         private void defaultJoinInstructions(final Record<KLeft, 
Change<VLeft>> record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final boolean needToUnsubscribe = oldForeignKey != null;
+
+            // if left row is inserted or updated, subscribe to new FK (if new 
FK is valid)
+            if (newValue != null) {
+                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), newValue);
 
-                if (oldForeignKey == null && newForeignKey == null) {
+                if (newForeignKey == null) { // invalid FK
                     logSkippedRecordDueToNullForeignKey();
-                } else if (oldForeignKey == null) {
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
-                } else if (newForeignKey == null) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
-                } else if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
-                    //Different Foreign Key - delete the old key value and 
propagate the new one.
-                    //Delete it from the oldKey's state store
-                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
-                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
-                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                    //and LEFT join.
-                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-                } else { // unchanged FK
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                    if (needToUnsubscribe) {
+                        // delete old subscription
+                        //
+                        // this may lead to unnecessary tombstones if the old 
FK did not join;
+                        // however, we cannot avoid it as we have no means to 
know if the old FK joined or not
+                        forward(record, oldForeignKey, 
DELETE_KEY_AND_PROPAGATE);
+                    }
+                } else { // valid FK
+                    // regular insert/update
+
+                    if (needToUnsubscribe) {
+                        // update case
+
+                        if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+                            // if FK did change, we need to explicitly delete 
the old subscription,
+                            // because the new subscription goes to a 
different partition
+                            //
+                            // we don't need any response, as we only want a 
response from the new subscription
+                            forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
+
+                            // subscribe for new FK (note, could be on a 
different task/node than the old FK)
+                            // additionally, propagate null if no FK is found 
so we can delete the previous result (if any)
+                            //
+                            // this may lead to unnecessary tombstones if the 
old FK did not join and the new FK key does not join either;
+                            // however, we cannot avoid it as we have no means 
to know if the old FK joined or not
+                            forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+                        } else {
+                            // if FK did not change, we only need a response 
from the new FK subscription, if there is a join
+                            // if there is no join, we know that the old row 
did not join either (as it used the same FK)
+                            // and thus we don't need to propagate an 
idempotent null result
+                            forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                        }
+                    } else {
+                        // insert case
+
+                        // subscribe to new key
+                        // don't propagate null if no FK is found:
+                        // for inserts, we know that there is no need to 
delete any previous result
+                        forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                    }
                 }
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (newForeignKey == null) {
-                    logSkippedRecordDueToNullForeignKey();
-                } else {
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+            } else {
+                // left row is deleted
+                if (needToUnsubscribe) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
             }
         }
 
+        private boolean foreignKeyChanged(final KRight newForeignKey, final 
KRight oldForeignKey) {
+            return !Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey));
+        }
+
         private byte[] serialize(final KRight key) {
             return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
         }

Reply via email to