This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba02e8c KAFKA-9244: Update FK reference should unsubscribe old FK
(#7758)
ba02e8c is described below
commit ba02e8c6b6802262646a7d6287c7a2c237be65fd
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 29 21:21:06 2019 -0800
KAFKA-9244: Update FK reference should unsubscribe old FK (#7758)
Reviewers: Adam Bellemare <[email protected]>, John Roesler
<[email protected]>
---
.../SubscriptionStoreReceiveProcessorSupplier.java | 4 +-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 71 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 0a86980..9cbeadd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue =
ValueAndTimestamp.make(value, context().timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue =
store.get(subscriptionKey);
- //If the subscriptionWrapper hash indicates a null, must
delete from statestore.
//This store is used by the prefix scanner in
ForeignJoinSubscriptionProcessorSupplier
- if (value.getHash() == null) {
+ if
(value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE)
||
+
value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE))
{
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 80c0f52..746d6b3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
+ @Test
+ public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
+ final Topology topology = getTopology(streamsConfig, "store",
leftJoin);
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
+
+ // Pre-populate the RHS records. This test is all about what
happens when we change LHS records foreign key reference
+ // then populate update on RHS
+ right.pipeInput("rhs1", "rhsValue1");
+ right.pipeInput("rhs2", "rhsValue2");
+
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1");
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(expected)
+ );
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
+
+ // Change LHS foreign key reference
+ left.pipeInput("lhs1", "lhsValue1|rhs2");
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(expected)
+ );
+ assertThat(
+ asMap(store),
+ is(expected)
+ );
+ }
+
+ // Populate RHS update on old LHS foreign key ref
+ right.pipeInput("rhs1", "rhsValue1Delta");
+ {
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ assertThat(
+ asMap(store),
+ is(mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ ))
+ );
+ }
+ }
+ }
+
private static Map<String, String> asMap(final KeyValueStore<String,
String> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));