This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new a78491af799 KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value (#19303) a78491af799 is described below commit a78491af79998d0c860860314cac3ed54b79f663 Author: Ayoub Omari <ayouboma...@outlook.fr> AuthorDate: Sun Apr 6 05:13:31 2025 +0200 KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value (#19303) Fixes both KAFKA-16407 and KAFKA-16434. Summary of existing issues: - We are ignoring new left record when its previous FK value is null - We do not unset foreign key join result when FK becomes null Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../SubscriptionSendProcessorSupplier.java | 29 +++++----- .../SubscriptionSendProcessorSupplierTest.java | 64 +++++++++++++++++++++- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index cd39315cc79..efae7ba0b29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -144,27 +144,24 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup private void defaultJoinInstructions(final Record<K, Change<V>> record) { if (record.value().oldValue != null) { final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); - if (oldForeignKey == null) { + final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); + + if (oldForeignKey == null && newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); - return; - } - if (record.value().newValue != null) { - final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); - if (newForeignKey == null) { - logSkippedRecordDueToNullForeignKey(); - return; - } - if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { - //Different Foreign Key - delete the old key value and propagate the new one. - //Delete it from the oldKey's state store - forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); - } + } else if (oldForeignKey == null) { + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + } else if (newForeignKey == null) { + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); + } else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + //Different Foreign Key - delete the old key value and propagate the new one. + //Delete it from the oldKey's state store + forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); //Add to the newKey's state store. Additionally, propagate null if no FK is found there, //since we must "unset" any output set by the previous FK-join. This is true for both INNER //and LEFT join. forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); - } else { - forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); + } else { // unchanged FK + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); } } else if (record.value().newValue != null) { final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java index 18c0ed9a0e7..255fb093a60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java @@ -295,10 +295,57 @@ public class SubscriptionSendProcessorSupplierTest { innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0)); assertThat(context.forwarded(), empty()); + } - // test dropped-records sensors - assertEquals(1.0, getDroppedRecordsTotalMetric(context)); - assertNotEquals(0.0, getDroppedRecordsRateMetric(context)); + @Test + public void innerJoinShouldPropagateChangeFromNullFKToNonNullFK() { + final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>(); + innerJoinProcessor.init(context); + context.setRecordMetadata("topic", 0, 0); + + final LeftValue leftRecordValue = new LeftValue(fk1); + + innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(null)), 0)); + + assertThat(context.forwarded().size(), is(1)); + assertThat( + context.forwarded().get(0).record(), + is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0)) + ); + } + + @Test + public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK() { + final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>(); + innerJoinProcessor.init(context); + context.setRecordMetadata("topic", 0, 0); + + final LeftValue leftRecordValue = new LeftValue(null); + + innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0)); + + assertThat(context.forwarded().size(), is(1)); + assertThat( + context.forwarded().get(0).record(), + is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0)) + ); + } + + @Test + public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() { + final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>(); + innerJoinProcessor.init(context); + context.setRecordMetadata("topic", 0, 0); + + final LeftValue leftRecordValue = new LeftValue(fk1); + + innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0)); + + assertThat(context.forwarded().size(), is(1)); + assertThat( + context.forwarded().get(0).record(), + is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0)) + ); } @Test @@ -316,6 +363,17 @@ public class SubscriptionSendProcessorSupplierTest { ); } + @Test + public void innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() { + final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>(); + innerJoinProcessor.init(context); + context.setRecordMetadata("topic", 0, 0); + + innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(null)), 0)); + + assertThat(context.forwarded(), empty()); + } + @Test public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() { final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();