This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d35ab4d27a9 KAFKA-18713: Fix FK Left-Join result race condition
(#19005)
d35ab4d27a9 is described below
commit d35ab4d27a9feb95d0716ed3d333237a41b7701a
Author: nilmadhab mondal <[email protected]>
AuthorDate: Fri Apr 4 01:22:47 2025 +0200
KAFKA-18713: Fix FK Left-Join result race condition (#19005)
When a row in a FK-join left table is updated, we should send a "delete
subscription with no response" for the old FK to the right hand side, to
avoid getting two responses from the right hand side. Only the "new
subscription" for the new FK should request a response. If two responses
are requested, there is a race condition for which both responses could
be processed in the wrong order, leading to an incorrect join result.
This PR fixes the "delete subscription" case accordingly, to no request
a response.
Reviewers: Matthias J. Sax <[email protected]>
---
.../KTableKTableForeignKeyJoinIntegrationTest.java | 135 +++++++++++++++++----
.../SubscriptionSendProcessorSupplier.java | 2 +-
.../SubscriptionSendProcessorSupplierTest.java | 8 +-
3 files changed, 114 insertions(+), 31 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 4bd0f74da1d..256eddd6f74 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
@@ -60,10 +61,13 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -183,13 +187,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this
unreferenced FK won't show up in any results
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(emptyMap())
+ outputTopic.readKeyValuesToList(),
+ is(emptyList())
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(emptyMap())
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(emptyList())
);
}
if (materialized) {
@@ -203,27 +207,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+ final List<KeyValue<String, String>> expected = Arrays.asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
);
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
- mkEntry("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(asList(
+ KeyValue.pair("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+ KeyValue.pair("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
))
);
}
if (materialized) {
assertThat(
asMap(store),
- is(expected)
+ is(expected.stream().collect(Collectors.toMap(kv ->
kv.key, kv -> kv.value)))
);
}
}
@@ -232,16 +236,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
{
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+ outputTopic.readKeyValuesToList(),
+ is(List.of(
+ new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
))
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs3",
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(List.of(
+ new KeyValue<>("lhs3",
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
))
);
}
@@ -256,21 +260,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
);
}
}
+
// Now delete one LHS entity such that one delete is propagated
down to the output.
left.pipeInput("lhs1", null, baseTimestamp + 6);
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", null)
+ outputTopic.readKeyValuesToList(),
+ is(List.of(
+ new KeyValue<>("lhs1", null)
))
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", null)
- ))
+ rejoinOutputTopic.readKeyValuesToList(),
+ hasItem(
+ KeyValue.pair("lhs1", null))
);
}
if (materialized) {
@@ -285,6 +289,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
+ @ParameterizedTest
+ @MethodSource("testCases")
+ public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean
leftJoin,
+ final String
optimization,
+ final boolean
materialized,
+ final boolean rejoin,
+ final boolean
leftVersioned,
+ final boolean
rightVersioned) {
+ final Properties streamsConfig = getStreamsProperties(optimization);
+ final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ?
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new
StringDeserializer()) : null;
+ final KeyValueStore<String, ValueAndTimestamp<String>> store =
driver.getTimestampedKeyValueStore("store");
+
+ // Pre-populate the RHS records. This test is all about what
happens when we add/remove LHS records
+ right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+ right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(emptyList())
+ );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(emptyList())
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+
+ {
+ final List<KeyValue<String, String>> expected = asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(expected)
+ );
+ }
+
+ // Add another reference to an existing FK
+ left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
+ {
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(List.of(
+ new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ ))
+ );
+ }
+
+ // Now revert back the foreign key to earlier reference
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(List.of(
+ new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ ))
+ );
+ }
+ }
+
@ParameterizedTest
@MethodSource("testCases")
public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -795,6 +872,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
return result;
}
+ protected static List<KeyValue<String, String>> makeList(final
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+ final List<KeyValue<String, String>> result = new LinkedList<>();
+ store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key,
ele.value.value())));
+ return result;
+ }
+
protected static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName,
final boolean leftJoin,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 40e404bf301..d0abb86d7db 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -132,7 +132,7 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey != null &&
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
- forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+ forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index fe262840a43..5f20466b873 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0),
0))
+ is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0),
0))
);
}
@@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+ is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}
@@ -438,7 +438,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0),
0))
+ is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0),
0))
);
}
@@ -491,7 +491,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(compositeKey, new SubscriptionWrapper<>(null,
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+ is(new Record<>(compositeKey, new SubscriptionWrapper<>(null,
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}