This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0ef910a235c2636afaac679b240c29b9cf335d87
Author: Alex Sorokoumov <[email protected]>
AuthorDate: Thu Aug 18 20:20:04 2022 +0200

    KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier 
(#12535)
    
    This patch fixes another incorrect version check in the FK code and adds 
unit tests that would have caught this bug.
    
    Reviewers: John Roesler <[email protected]>
---
 .../internals/foreignkeyjoin/CombinedKey.java      |  18 +-
 .../SubscriptionStoreReceiveProcessorSupplier.java |   2 +-
 .../foreignkeyjoin/SubscriptionWrapper.java        |  21 +
 ...scriptionStoreReceiveProcessorSupplierTest.java | 517 +++++++++++++++++++++
 4 files changed, 555 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index f196b18368f..ae5f20fe533 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -38,8 +38,22 @@ public class CombinedKey<KF, KP> {
         return primaryKey;
     }
 
-    public boolean equals(final KF foreignKey, final KP primaryKey) {
-        return this.foreignKey.equals(foreignKey) && 
this.primaryKey.equals(primaryKey);
+    @Override
+    public int hashCode() {
+        return Objects.hash(foreignKey, primaryKey);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final CombinedKey<?, ?> that = (CombinedKey<?, ?>) o;
+        return Objects.equals(foreignKey, that.foreignKey) && Objects.equals(
+            primaryKey, that.primaryKey);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index a834e14d1db..dcbf6c0eaf7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -91,7 +91,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
                     droppedRecordsSensor.record();
                     return;
                 }
-                if (record.value().getVersion() != 
SubscriptionWrapper.CURRENT_VERSION) {
+                if (record.value().getVersion() > 
SubscriptionWrapper.CURRENT_VERSION) {
                     //Guard against modifications to SubscriptionWrapper. Need 
to ensure that there is compatibility
                     //with previous versions to enable rolling upgrades. Must 
develop a strategy for upgrading
                     //from older SubscriptionWrapper versions to newer 
versions.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index 41d5f1198e5..5ba8835c1d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -119,5 +119,26 @@ public class SubscriptionWrapper<K> {
             ", primaryPartition=" + primaryPartition +
             '}';
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final SubscriptionWrapper<?> that = (SubscriptionWrapper<?>) o;
+        return version == that.version && Arrays.equals(hash, that.hash)
+            && instruction == that.instruction && Objects.equals(primaryKey, 
that.primaryKey)
+            && Objects.equals(primaryPartition, that.primaryPartition);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(instruction, version, primaryKey, 
primaryPartition);
+        result = 31 * result + Arrays.hashCode(hash);
+        return result;
+    }
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
new file mode 100644
index 00000000000..17f34d56d3f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.internals.Change;
+import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SubscriptionStoreReceiveProcessorSupplierTest {
+
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private File stateDir;
+    private MockInternalNewProcessorContext<CombinedKey<String, String>, 
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> context;
+
+    private static final String FK = "fk1";
+    private static final String PK1 = "pk1";
+    private static final String PK2 = "pk2";
+
+    private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () -> 
"pk-topic";
+    private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA 
= new CombinedKeySchema<>(
+        () -> "fk-topic",
+        Serdes.String(),
+        PK_SERDE_TOPIC_SUPPLIER,
+        Serdes.String()
+    );
+
+    @Before
+    public void before() {
+        stateDir = TestUtils.tempDirectory();
+        context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 
0), stateDir);
+    }
+
+    @After
+    public void after() throws IOException {
+        Utils.delete(stateDir);
+    }
+
+    @Test
+    public void shouldDetectVersionChange() {
+        // This test serves as a reminder to add new tests once we bump 
SubscriptionWrapper version.
+        Assert.assertEquals(SubscriptionWrapper.VERSION_1, 
SubscriptionWrapper.CURRENT_VERSION);
+    }
+
+    @Test
+    public void shouldDeleteKeyAndPropagateV0() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+                        SubscriptionWrapper<String>,
+                        CombinedKey<String, String>,
+                        
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = 
supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            PK2,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            PK1,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+                                   ? extends 
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = 
context.forwarded();
+        Assert.assertNull(stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                  .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldDeleteKeyAndPropagateV1() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            PK2,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            PK1,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+        Assert.assertNull(stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldDeleteKeyNoPropagateV0() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            PK2,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            PK1,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+        Assert.assertNull(stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldDeleteKeyNoPropagateV1() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            PK2,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            PK1,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+        Assert.assertNull(stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateOnlyIfFKValAvailableV0() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            PK2,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            PK1,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+
+        Assert.assertEquals(newValue, stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateOnlyIfFKValAvailableV1() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            PK2,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            PK1,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+
+        Assert.assertEquals(newValue, stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateNullIfNoFKValAvailableV0() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            PK2,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            PK1,
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+
+        Assert.assertEquals(newValue, stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateNullIfNoFKValAvailableV1() {
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
+        final SubscriptionStoreReceiveProcessorSupplier<String, String> 
supplier = supplier(storeBuilder);
+        final Processor<String,
+            SubscriptionWrapper<String>,
+            CombinedKey<String, String>,
+            Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor 
= supplier.get();
+        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> 
stateStore = storeBuilder.build();
+        context.addStateStore(stateStore);
+        stateStore.init((StateStoreContext) context, stateStore);
+
+        final SubscriptionWrapper<String> oldWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            PK2,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> oldValue = 
ValueAndTimestamp.make(oldWrapper, 0);
+
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        stateStore.put(key, oldValue);
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newWrapper = new 
SubscriptionWrapper<>(
+            new long[]{1L, 2L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            PK1,
+            SubscriptionWrapper.VERSION_1,
+            1
+        );
+        final ValueAndTimestamp<SubscriptionWrapper<String>> newValue = 
ValueAndTimestamp.make(
+            newWrapper, 1L);
+        final Record<String, SubscriptionWrapper<String>> record = new 
Record<>(
+            FK,
+            newWrapper,
+            1L
+        );
+        processor.process(record);
+        final List<CapturedForward<? extends CombinedKey<String, String>,
+            ? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> 
forwarded = context.forwarded();
+
+        Assert.assertEquals(newValue, stateStore.get(key));
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            record.withKey(new CombinedKey<>(FK, PK1))
+                .withValue(new Change<>(newValue, oldValue)),
+            forwarded.get(0).record()
+        );
+    }
+
+
+    private SubscriptionStoreReceiveProcessorSupplier<String, String> supplier(
+        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder) {
+
+        return new SubscriptionStoreReceiveProcessorSupplier<>(storeBuilder, 
COMBINED_KEY_SCHEMA);
+    }
+
+    private StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder() {
+        final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde = 
new SubscriptionWrapperSerde<>(
+            PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
+        return Stores.timestampedKeyValueStoreBuilder(
+            Stores.persistentTimestampedKeyValueStore(
+                "Store"
+            ),
+            new Serdes.BytesSerde(),
+            subscriptionWrapperSerde
+        );
+    }
+}
\ No newline at end of file

Reply via email to