This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d35ab4d27a9 KAFKA-18713: Fix FK Left-Join result race condition 
(#19005)
d35ab4d27a9 is described below

commit d35ab4d27a9feb95d0716ed3d333237a41b7701a
Author: nilmadhab mondal <[email protected]>
AuthorDate: Fri Apr 4 01:22:47 2025 +0200

    KAFKA-18713: Fix FK Left-Join result race condition (#19005)
    
    When a row in a FK-join left table is updated, we should send a "delete
    subscription with no response" for the old FK to the right hand side, to
    avoid getting two responses from the right hand side. Only the "new
    subscription" for the new FK should request a response. If two responses
    are requested, there is a race condition for which both responses could
    be processed in the wrong order, leading to an incorrect join result.
    
    This PR fixes the "delete subscription" case accordingly, to no request
    a response.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 135 +++++++++++++++++----
 .../SubscriptionSendProcessorSupplier.java         |   2 +-
 .../SubscriptionSendProcessorSupplierTest.java     |   8 +-
 3 files changed, 114 insertions(+), 31 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 4bd0f74da1d..256eddd6f74 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
@@ -60,10 +61,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -183,13 +187,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this 
unreferenced FK won't show up in any results
 
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(emptyMap())
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(emptyMap())
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
                 );
             }
             if (materialized) {
@@ -203,27 +207,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+                final List<KeyValue<String, String>> expected = Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-                            mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(asList(
+                            KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
                         asMap(store),
-                        is(expected)
+                        is(expected.stream().collect(Collectors.toMap(kv -> 
kv.key, kv -> kv.value)))
                     );
                 }
             }
@@ -232,16 +236,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(List.of(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
@@ -256,21 +260,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     );
                 }
             }
+
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
             left.pipeInput("lhs1", null, baseTimestamp + 6);
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", null)
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", null)
                 ))
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", null)
-                    ))
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    hasItem(
+                        KeyValue.pair("lhs1", null))
                 );
             }
             if (materialized) {
@@ -285,6 +289,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("testCases")
+    public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean 
leftJoin,
+                                                         final String 
optimization,
+                                                         final boolean 
materialized,
+                                                         final boolean rejoin,
+                                                         final boolean 
leftVersioned,
+                                                         final boolean 
rightVersioned) {
+        final Properties streamsConfig = getStreamsProperties(optimization);
+        final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
+            final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? 
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new 
StringDeserializer()) : null;
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
+
+            // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
+            );
+            if (rejoin) {
+                assertThat(
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+
+            {
+                final List<KeyValue<String, String>> expected = asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(expected)
+                );
+            }
+
+            // Add another reference to an existing FK
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                    ))
+                );
+            }
+
+            // Now revert back the foreign key to earlier reference
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                ))
+            );
+        }
+    }
+
     @ParameterizedTest
     @MethodSource("testCases")
     public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -795,6 +872,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         return result;
     }
 
+    protected static List<KeyValue<String, String>> makeList(final 
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+        final List<KeyValue<String, String>> result = new LinkedList<>();
+        store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key, 
ele.value.value())));
+        return result;
+    }
+
     protected static Topology getTopology(final Properties streamsConfig,
                                           final String queryableStoreName,
                                           final boolean leftJoin,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 40e404bf301..d0abb86d7db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -132,7 +132,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
                 final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
                 final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
                 if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
                 forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             } else if (record.value().newValue != null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index fe262840a43..5f20466b873 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
     }
 
@@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
     }
 
@@ -438,7 +438,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
     }
 
@@ -491,7 +491,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
     }
 

Reply via email to