This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0ef910a235c2636afaac679b240c29b9cf335d87 Author: Alex Sorokoumov <[email protected]> AuthorDate: Thu Aug 18 20:20:04 2022 +0200 KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier (#12535) This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug. Reviewers: John Roesler <[email protected]> --- .../internals/foreignkeyjoin/CombinedKey.java | 18 +- .../SubscriptionStoreReceiveProcessorSupplier.java | 2 +- .../foreignkeyjoin/SubscriptionWrapper.java | 21 + ...scriptionStoreReceiveProcessorSupplierTest.java | 517 +++++++++++++++++++++ 4 files changed, 555 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java index f196b18368f..ae5f20fe533 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java @@ -38,8 +38,22 @@ public class CombinedKey<KF, KP> { return primaryKey; } - public boolean equals(final KF foreignKey, final KP primaryKey) { - return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey); + @Override + public int hashCode() { + return Objects.hash(foreignKey, primaryKey); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CombinedKey<?, ?> that = (CombinedKey<?, ?>) o; + return Objects.equals(foreignKey, that.foreignKey) && Objects.equals( + primaryKey, that.primaryKey); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java index a834e14d1db..dcbf6c0eaf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java @@ -91,7 +91,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO> droppedRecordsSensor.record(); return; } - if (record.value().getVersion() != SubscriptionWrapper.CURRENT_VERSION) { + if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) { //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading //from older SubscriptionWrapper versions to newer versions. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java index 41d5f1198e5..5ba8835c1d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java @@ -119,5 +119,26 @@ public class SubscriptionWrapper<K> { ", primaryPartition=" + primaryPartition + '}'; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SubscriptionWrapper<?> that = (SubscriptionWrapper<?>) o; + return version == that.version && Arrays.equals(hash, that.hash) + && instruction == that.instruction && Objects.equals(primaryKey, that.primaryKey) + && Objects.equals(primaryPartition, that.primaryPartition); + } + + @Override + public int hashCode() { + int result = Objects.hash(instruction, version, primaryKey, primaryPartition); + result = 31 * result + Arrays.hashCode(hash); + return result; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java new file mode 100644 index 00000000000..17f34d56d3f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SubscriptionStoreReceiveProcessorSupplierTest { + + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); + private File stateDir; + private MockInternalNewProcessorContext<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> context; + + private static final String FK = "fk1"; + private static final String PK1 = "pk1"; + private static final String PK2 = "pk2"; + + private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () -> "pk-topic"; + private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA = new CombinedKeySchema<>( + () -> "fk-topic", + Serdes.String(), + PK_SERDE_TOPIC_SUPPLIER, + Serdes.String() + ); + + @Before + public void before() { + stateDir = TestUtils.tempDirectory(); + context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir); + } + + @After + public void after() throws IOException { + Utils.delete(stateDir); + } + + @Test + public void shouldDetectVersionChange() { + // This test serves as a reminder to add new tests once we bump SubscriptionWrapper version. + Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION); + } + + @Test + public void shouldDeleteKeyAndPropagateV0() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + PK2, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + PK1, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + Assert.assertNull(stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldDeleteKeyAndPropagateV1() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + PK2, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + PK1, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + Assert.assertNull(stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldDeleteKeyNoPropagateV0() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + PK2, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + PK1, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + Assert.assertNull(stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldDeleteKeyNoPropagateV1() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + PK2, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + PK1, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + Assert.assertNull(stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateOnlyIfFKValAvailableV0() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + PK2, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + PK1, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + + Assert.assertEquals(newValue, stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateOnlyIfFKValAvailableV1() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + PK2, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + PK1, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + + Assert.assertEquals(newValue, stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateNullIfNoFKValAvailableV0() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + PK2, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + PK1, + SubscriptionWrapper.VERSION_0, + null + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + + Assert.assertEquals(newValue, stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateNullIfNoFKValAvailableV1() { + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); + final SubscriptionStoreReceiveProcessorSupplier<String, String> supplier = supplier(storeBuilder); + final Processor<String, + SubscriptionWrapper<String>, + CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get(); + final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build(); + context.addStateStore(stateStore); + stateStore.init((StateStoreContext) context, stateStore); + + final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + PK2, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = ValueAndTimestamp.make(oldWrapper, 0); + + final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1); + stateStore.put(key, oldValue); + processor.init(context); + + final SubscriptionWrapper<String> newWrapper = new SubscriptionWrapper<>( + new long[]{1L, 2L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + PK1, + SubscriptionWrapper.VERSION_1, + 1 + ); + final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = ValueAndTimestamp.make( + newWrapper, 1L); + final Record<String, SubscriptionWrapper<String>> record = new Record<>( + FK, + newWrapper, + 1L + ); + processor.process(record); + final List<CapturedForward<? extends CombinedKey<String, String>, + ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded(); + + Assert.assertEquals(newValue, stateStore.get(key)); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + record.withKey(new CombinedKey<>(FK, PK1)) + .withValue(new Change<>(newValue, oldValue)), + forwarded.get(0).record() + ); + } + + + private SubscriptionStoreReceiveProcessorSupplier<String, String> supplier( + final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder) { + + return new SubscriptionStoreReceiveProcessorSupplier<>(storeBuilder, COMBINED_KEY_SCHEMA); + } + + private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() { + final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>( + PK_SERDE_TOPIC_SUPPLIER, Serdes.String()); + return Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore( + "Store" + ), + new Serdes.BytesSerde(), + subscriptionWrapperSerde + ); + } +} \ No newline at end of file
