This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c77c544c6 KAFKA-13769 Fix version check in
SubscriptionStoreReceiveProcessorSupplier (#12535)
5c77c544c6 is described below
commit 5c77c544c6c448f615db5852fe29e4806ab7700e
Author: Alex Sorokoumov <[email protected]>
AuthorDate: Thu Aug 18 20:20:04 2022 +0200
KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier
(#12535)
This patch fixes another incorrect version check in the FK code and adds
unit tests that would have caught this bug.
Reviewers: John Roesler <[email protected]>
---
.../internals/foreignkeyjoin/CombinedKey.java | 18 +-
.../SubscriptionStoreReceiveProcessorSupplier.java | 2 +-
.../foreignkeyjoin/SubscriptionWrapper.java | 21 +
...scriptionStoreReceiveProcessorSupplierTest.java | 517 +++++++++++++++++++++
4 files changed, 555 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index f196b18368..ae5f20fe53 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -38,8 +38,22 @@ public class CombinedKey<KF, KP> {
return primaryKey;
}
- public boolean equals(final KF foreignKey, final KP primaryKey) {
- return this.foreignKey.equals(foreignKey) &&
this.primaryKey.equals(primaryKey);
+ @Override
+ public int hashCode() {
+ return Objects.hash(foreignKey, primaryKey);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final CombinedKey<?, ?> that = (CombinedKey<?, ?>) o;
+ return Objects.equals(foreignKey, that.foreignKey) && Objects.equals(
+ primaryKey, that.primaryKey);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index a834e14d1d..dcbf6c0eaf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -91,7 +91,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
droppedRecordsSensor.record();
return;
}
- if (record.value().getVersion() !=
SubscriptionWrapper.CURRENT_VERSION) {
+ if (record.value().getVersion() >
SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need
to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must
develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer
versions.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index 41d5f1198e..5ba8835c1d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -119,5 +119,26 @@ public class SubscriptionWrapper<K> {
", primaryPartition=" + primaryPartition +
'}';
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SubscriptionWrapper<?> that = (SubscriptionWrapper<?>) o;
+ return version == that.version && Arrays.equals(hash, that.hash)
+ && instruction == that.instruction && Objects.equals(primaryKey,
that.primaryKey)
+ && Objects.equals(primaryPartition, that.primaryPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(instruction, version, primaryKey,
primaryPartition);
+ result = 31 * result + Arrays.hashCode(hash);
+ return result;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
new file mode 100644
index 0000000000..17f34d56d3
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.internals.Change;
+import
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SubscriptionStoreReceiveProcessorSupplierTest {
+
+ private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private File stateDir;
+ private MockInternalNewProcessorContext<CombinedKey<String, String>,
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> context;
+
+ private static final String FK = "fk1";
+ private static final String PK1 = "pk1";
+ private static final String PK2 = "pk2";
+
+ private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () ->
"pk-topic";
+ private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA
= new CombinedKeySchema<>(
+ () -> "fk-topic",
+ Serdes.String(),
+ PK_SERDE_TOPIC_SUPPLIER,
+ Serdes.String()
+ );
+
+ @Before
+ public void before() {
+ stateDir = TestUtils.tempDirectory();
+ context = new MockInternalNewProcessorContext<>(props, new TaskId(0,
0), stateDir);
+ }
+
+ @After
+ public void after() throws IOException {
+ Utils.delete(stateDir);
+ }
+
+ @Test
+ public void shouldDetectVersionChange() {
+ // This test serves as a reminder to add new tests once we bump
SubscriptionWrapper version.
+ Assert.assertEquals(SubscriptionWrapper.VERSION_1,
SubscriptionWrapper.CURRENT_VERSION);
+ }
+
+ @Test
+ public void shouldDeleteKeyAndPropagateV0() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor =
supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ PK2,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ PK1,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded =
context.forwarded();
+ Assert.assertNull(stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldDeleteKeyAndPropagateV1() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ PK2,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ PK1,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+ Assert.assertNull(stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldDeleteKeyNoPropagateV0() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ PK2,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ PK1,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+ Assert.assertNull(stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldDeleteKeyNoPropagateV1() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ PK2,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ PK1,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+ Assert.assertNull(stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateOnlyIfFKValAvailableV0() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ PK2,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ PK1,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+
+ Assert.assertEquals(newValue, stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateOnlyIfFKValAvailableV1() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ PK2,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ PK1,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+
+ Assert.assertEquals(newValue, stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateNullIfNoFKValAvailableV0() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ PK2,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ PK1,
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+
+ Assert.assertEquals(newValue, stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateNullIfNoFKValAvailableV1() {
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+ final SubscriptionStoreReceiveProcessorSupplier<String, String>
supplier = supplier(storeBuilder);
+ final Processor<String,
+ SubscriptionWrapper<String>,
+ CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor
= supplier.get();
+ final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>
stateStore = storeBuilder.build();
+ context.addStateStore(stateStore);
+ stateStore.init((StateStoreContext) context, stateStore);
+
+ final SubscriptionWrapper<String> oldWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ PK2,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue =
ValueAndTimestamp.make(oldWrapper, 0);
+
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ stateStore.put(key, oldValue);
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newWrapper = new
SubscriptionWrapper<>(
+ new long[]{1L, 2L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ PK1,
+ SubscriptionWrapper.VERSION_1,
+ 1
+ );
+ final ValueAndTimestamp<SubscriptionWrapper<String>> newValue =
ValueAndTimestamp.make(
+ newWrapper, 1L);
+ final Record<String, SubscriptionWrapper<String>> record = new
Record<>(
+ FK,
+ newWrapper,
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends CombinedKey<String, String>,
+ ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>>
forwarded = context.forwarded();
+
+ Assert.assertEquals(newValue, stateStore.get(key));
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ record.withKey(new CombinedKey<>(FK, PK1))
+ .withValue(new Change<>(newValue, oldValue)),
+ forwarded.get(0).record()
+ );
+ }
+
+
+ private SubscriptionStoreReceiveProcessorSupplier<String, String> supplier(
+ final StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder) {
+
+ return new SubscriptionStoreReceiveProcessorSupplier<>(storeBuilder,
COMBINED_KEY_SCHEMA);
+ }
+
+ private StoreBuilder<TimestampedKeyValueStore<Bytes,
SubscriptionWrapper<String>>> storeBuilder() {
+ final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde =
new SubscriptionWrapperSerde<>(
+ PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
+ return Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(
+ "Store"
+ ),
+ new Serdes.BytesSerde(),
+ subscriptionWrapperSerde
+ );
+ }
+}
\ No newline at end of file