This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61587edd79a MINOR: simplify FK-join logic (#20605)
61587edd79a is described below
commit 61587edd79a6688ca2dea4e2b6b98324b0233f27
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Oct 22 08:39:45 2025 -0700
MINOR: simplify FK-join logic (#20605)
The existing FK join logic is very convoluted due to incremental changes
and bug-fixes, and thus very hard to understand.
This PR rewrites the logic from scratch to make it easier to
understanding.
Reviewers: Lucas Brutschy <[email protected]>, Liam
Clarke-Hutchinson <[email protected]>, Nikita Shupletsov
<[email protected]>
---
.../KTableKTableForeignKeyJoinIntegrationTest.java | 5 +-
.../SubscriptionReceiveProcessorSupplier.java | 5 +-
.../SubscriptionSendProcessorSupplier.java | 122 +++++++++++++++------
3 files changed, 97 insertions(+), 35 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 256eddd6f74..fa289ca5959 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -891,7 +891,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
rejoin,
leftVersioned,
rightVersioned,
- value -> value.split("\\|")[1]
+ value -> {
+ final String[] tokens = value.split("\\|");
+ return tokens.length == 2 ? tokens[1] : null;
+ }
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index e654cd752af..3bb3c7a9396 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -82,7 +82,8 @@ public class SubscriptionReceiveProcessorSupplier<KLeft,
KRight>
@Override
public void process(final Record<KRight,
SubscriptionWrapper<KLeft>> record) {
- if (record.key() == null &&
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
{
+ final KRight foreignKey = record.key();
+ if (foreignKey == null &&
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
{
dropRecord();
return;
}
@@ -93,7 +94,7 @@ public class SubscriptionReceiveProcessorSupplier<KLeft,
KRight>
throw new UnsupportedVersionException("SubscriptionWrapper
is of an incompatible version.");
}
context().forward(
- record.withKey(new CombinedKey<>(record.key(),
record.value().primaryKey()))
+ record.withKey(new CombinedKey<>(foreignKey,
record.value().primaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index b161ce092c4..6e57ca4991f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -128,51 +128,109 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
}
private void leftJoinInstructions(final Record<KLeft, Change<VLeft>>
record) {
- if (record.value().oldValue != null) {
- final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- final KRight newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
- if (oldForeignKey != null &&
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+ final VLeft oldValue = record.value().oldValue;
+ final VLeft newValue = record.value().newValue;
+
+ if (oldValue == null && newValue == null) {
+ // no output for idempotent left hand side deletes
+ return;
+ }
+
+ final KRight oldForeignKey = oldValue == null ? null :
foreignKeyExtractor.extract(record.key(), oldValue);
+ final KRight newForeignKey = newValue == null ? null :
foreignKeyExtractor.extract(record.key(), newValue);
+
+ final boolean maybeUnsubscribe = oldForeignKey != null;
+ if (maybeUnsubscribe) {
+ // delete old subscription only if FK changed
+ //
+ // if FK did change, we need to explicitly delete the old
subscription,
+ // because the new subscription goes to a different partition
+ if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+ // this may lead to unnecessary tombstones if the old FK
did not join;
+ // however, we cannot avoid it as we have no means to know
if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
- forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
- } else if (record.value().newValue != null) {
- final KRight newForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().newValue);
- forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
+
+ // for all cases (insert, update, and delete), we send a new
subscription;
+ // we need to get a response back for all cases to always produce
a left-join result
+ //
+ // note: for delete, `newForeignKey` is null, what is a "hack"
+ // no actual subscription will be added for null-FK on the right
hand side, but we still get the response back we need
+ //
+ // this may lead to unnecessary tombstones if the old FK did not
join;
+ // however, we cannot avoid it as we have no means to know if the
old FK joined or not
+ forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
private void defaultJoinInstructions(final Record<KLeft,
Change<VLeft>> record) {
- if (record.value().oldValue != null) {
- final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- final KRight newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
+ final VLeft oldValue = record.value().oldValue;
+ final VLeft newValue = record.value().newValue;
+
+ final KRight oldForeignKey = oldValue == null ? null :
foreignKeyExtractor.extract(record.key(), oldValue);
+ final boolean needToUnsubscribe = oldForeignKey != null;
+
+ // if left row is inserted or updated, subscribe to new FK (if new
FK is valid)
+ if (newValue != null) {
+ final KRight newForeignKey =
foreignKeyExtractor.extract(record.key(), newValue);
- if (oldForeignKey == null && newForeignKey == null) {
+ if (newForeignKey == null) { // invalid FK
logSkippedRecordDueToNullForeignKey();
- } else if (oldForeignKey == null) {
- forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
- } else if (newForeignKey == null) {
- forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
- } else if (!Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey))) {
- //Different Foreign Key - delete the old key value and
propagate the new one.
- //Delete it from the oldKey's state store
- forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
- //Add to the newKey's state store. Additionally, propagate
null if no FK is found there,
- //since we must "unset" any output set by the previous
FK-join. This is true for both INNER
- //and LEFT join.
- forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
- } else { // unchanged FK
- forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+ if (needToUnsubscribe) {
+ // delete old subscription
+ //
+ // this may lead to unnecessary tombstones if the old
FK did not join;
+ // however, we cannot avoid it as we have no means to
know if the old FK joined or not
+ forward(record, oldForeignKey,
DELETE_KEY_AND_PROPAGATE);
+ }
+ } else { // valid FK
+ // regular insert/update
+
+ if (needToUnsubscribe) {
+ // update case
+
+ if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+ // if FK did change, we need to explicitly delete
the old subscription,
+ // because the new subscription goes to a
different partition
+ //
+ // we don't need any response, as we only want a
response from the new subscription
+ forward(record, oldForeignKey,
DELETE_KEY_NO_PROPAGATE);
+
+ // subscribe for new FK (note, could be on a
different task/node than the old FK)
+ // additionally, propagate null if no FK is found
so we can delete the previous result (if any)
+ //
+ // this may lead to unnecessary tombstones if the
old FK did not join and the new FK key does not join either;
+ // however, we cannot avoid it as we have no means
to know if the old FK joined or not
+ forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+ } else {
+ // if FK did not change, we only need a response
from the new FK subscription, if there is a join
+ // if there is no join, we know that the old row
did not join either (as it used the same FK)
+ // and thus we don't need to propagate an
idempotent null result
+ forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+ }
+ } else {
+ // insert case
+
+ // subscribe to new key
+ // don't propagate null if no FK is found:
+ // for inserts, we know that there is no need to
delete any previous result
+ forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+ }
}
- } else if (record.value().newValue != null) {
- final KRight newForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().newValue);
- if (newForeignKey == null) {
- logSkippedRecordDueToNullForeignKey();
- } else {
- forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+ } else {
+ // left row is deleted
+ if (needToUnsubscribe) {
+ // this may lead to unnecessary tombstones if the old FK
did not join;
+ // however, we cannot avoid it as we have no means to know
if the old FK joined or not
+ forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
}
}
+ private boolean foreignKeyChanged(final KRight newForeignKey, final
KRight oldForeignKey) {
+ return !Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey));
+ }
+
private byte[] serialize(final KRight key) {
return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
}