This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0e052af3c772caf44119edf1bf189a2bb9028e98 Author: Alex Sorokoumov <[email protected]> AuthorDate: Wed Jul 27 20:58:12 2022 +0200 KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437) Reviewers: Adam Bellemare <[email protected]>, John Roesler <[email protected]> --- .../SubscriptionResponseWrapper.java | 24 ++ .../foreignkeyjoin/SubscriptionWrapper.java | 5 +- ...reignJoinSubscriptionProcessorSupplierTest.java | 377 +++++++++++++++++++++ .../SubscriptionWrapperSerdeTest.java | 22 +- 4 files changed, 416 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java index ef9a3a055b5..99556f16e46 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; +import java.util.Objects; import org.apache.kafka.common.errors.UnsupportedVersionException; import java.util.Arrays; @@ -72,4 +73,27 @@ public class SubscriptionResponseWrapper<FV> { ", primaryPartition=" + primaryPartition + '}'; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SubscriptionResponseWrapper<?> that = (SubscriptionResponseWrapper<?>) o; + return version == that.version && + Arrays.equals(originalValueHash, + that.originalValueHash) && + Objects.equals(foreignValue, that.foreignValue) && + Objects.equals(primaryPartition, that.primaryPartition); + } + + @Override + public int hashCode() { + int result = Objects.hash(foreignValue, version, primaryPartition); + result = 31 * result + Arrays.hashCode(originalValueHash); + return result; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java index a75a419cd74..41d5f1198e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java @@ -23,7 +23,10 @@ import java.util.Objects; public class SubscriptionWrapper<K> { - static final byte CURRENT_VERSION = 1; + static final byte VERSION_0 = 0; + static final byte VERSION_1 = 1; + + static final byte CURRENT_VERSION = VERSION_1; // v0 fields: private final long[] hash; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java new file mode 100644 index 00000000000..1bf708b8ab5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.KTableValueGetter; +import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; +import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.Assert; +import org.junit.Test; + +public class ForeignJoinSubscriptionProcessorSupplierTest { + final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap( + "fk1", ValueAndTimestamp.make("foo", 1L) + ); + final KTableValueGetterSupplier<String, String> valueGetterSupplier = valueGetterSupplier(fks); + final Processor<CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>, + String, + SubscriptionResponseWrapper<String>> + processor = processor(valueGetterSupplier); + + @Test + public void shouldDetectVersionChange() { + // This test serves as a reminder to add new tests once we bump SubscriptionWrapper version. + Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION); + } + + @Test + public void shouldDeleteKeyAndPropagateFKV0() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + "pk1", + SubscriptionWrapper.VERSION_0, + null + ); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + null, + null), + 1L + ), + forwarded.get(0).record() + ); + } + + @Test + public void shouldDeleteKeyAndPropagateFKV1() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.DELETE_KEY_AND_PROPAGATE, + "pk1", + SubscriptionWrapper.VERSION_1, + 12 + ); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + null, + 12 + ), + 1L + ), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateOnlyIfFKAvailableV0() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + "pk1", + SubscriptionWrapper.VERSION_0, + null + ); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + "foo", + null + ), + 1L + ), + forwarded.get(0).record() + ); + } + + @Test + public void shouldPropagateOnlyIfFKAvailableV1() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, + "pk1", + SubscriptionWrapper.VERSION_1, + 12 + ); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + "foo", + 12 + ), + 1L + ), + forwarded.get(0).record()); + } + + @Test + public void shouldPropagateNullIfNoFKAvailableV0() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + "pk1", + SubscriptionWrapper.VERSION_0, + null + ); + Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + // propagate matched FK + List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + "foo", + null + ), + 1L + ), + forwarded.get(0).record()); + + record = new Record<>( + new CombinedKey<>("fk9000", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + // propagate null if there is no match + forwarded = context.forwarded(); + Assert.assertEquals(2, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + null, + null + ), + 1L + ), + forwarded.get(1).record()); + } + + @Test + public void shouldPropagateNullIfNoFKAvailableV1() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, + "pk1", + SubscriptionWrapper.VERSION_1, + 12); + Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(1, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + "foo", + 12 + ), + 1L + ), + forwarded.get(0).record()); + + record = new Record<>( + new CombinedKey<>("fk9000", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + // propagate null if there is no match + forwarded = context.forwarded(); + Assert.assertEquals(2, forwarded.size()); + Assert.assertEquals( + new Record<>( + "pk1", + new SubscriptionResponseWrapper<>( + newValue.getHash(), + null, + 12 + ), + 1L + ), + forwarded.get(1).record()); + } + + @Test + public void shouldDeleteKeyNoPropagateV0() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + "pk1", + SubscriptionWrapper.VERSION_0, + null); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>( + new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(0, forwarded.size()); + } + + @Test + public void shouldDeleteKeyNoPropagateV1() { + final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>(); + processor.init(context); + + final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>( + new long[]{1L}, + Instruction.DELETE_KEY_NO_PROPAGATE, + "pk1", + SubscriptionWrapper.VERSION_1, + 12); + final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record = + new Record<>(new CombinedKey<>("fk1", "pk1"), + new Change<>(ValueAndTimestamp.make(newValue, 1L), null), + 1L + ); + processor.process(record); + final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded(); + Assert.assertEquals(0, forwarded.size()); + } + + private KTableValueGetterSupplier<String, String> valueGetterSupplier(final Map<String, ValueAndTimestamp<String>> map) { + final KTableValueGetter<String, String> valueGetter = new KTableValueGetter<String, String>() { + + @Override + public ValueAndTimestamp<String> get(final String key) { + return map.get(key); + } + + @Override + public void init(final ProcessorContext context) { + + } + }; + return new KTableValueGetterSupplier<String, String>() { + @Override + public KTableValueGetter<String, String> get() { + return valueGetter; + } + + @Override + public String[] storeNames() { + return new String[0]; + } + }; + } + + private Processor<CombinedKey<String, String>, + Change<ValueAndTimestamp<SubscriptionWrapper<String>>>, + String, + SubscriptionResponseWrapper<String>> processor(final KTableValueGetterSupplier<String, String> valueGetterSupplier) { + final SubscriptionJoinForeignProcessorSupplier<String, String, String> supplier = + new SubscriptionJoinForeignProcessorSupplier<>(valueGetterSupplier); + return supplier.get(); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java index 8cd26d606d7..709a94bc6db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java @@ -34,7 +34,7 @@ public class SubscriptionWrapperSerdeTest { @Test @SuppressWarnings("unchecked") public void shouldSerdeV0Test() { - final byte version = 0; + final byte version = SubscriptionWrapper.VERSION_0; final String originalKey = "originalKey"; final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); @@ -59,7 +59,7 @@ public class SubscriptionWrapperSerdeTest { @Test @SuppressWarnings("unchecked") public void shouldSerdeV1Test() { - final byte version = 1; + final byte version = SubscriptionWrapper.VERSION_1; final String originalKey = "originalKey"; final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); @@ -84,7 +84,7 @@ public class SubscriptionWrapperSerdeTest { @Test @SuppressWarnings("unchecked") public void shouldSerdeWithV0IfUpgradeTest() { - final byte version = 1; + final byte version = SubscriptionWrapper.VERSION_1; final String originalKey = "originalKey"; final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); swSerde.configure( @@ -112,7 +112,7 @@ public class SubscriptionWrapperSerdeTest { @Test @SuppressWarnings("unchecked") public void shouldSerdeNullHashV0Test() { - final byte version = 0; + final byte version = SubscriptionWrapper.VERSION_0; final String originalKey = "originalKey"; final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); final long[] hashedValue = null; @@ -136,7 +136,7 @@ public class SubscriptionWrapperSerdeTest { @Test @SuppressWarnings("unchecked") public void shouldSerdeNullHashV1Test() { - final byte version = 1; + final byte version = SubscriptionWrapper.VERSION_1; final String originalKey = "originalKey"; final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); final long[] hashedValue = null; @@ -164,7 +164,7 @@ public class SubscriptionWrapperSerdeTest { final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String()); final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19}); final Integer primaryPartition = null; - final byte version = 0; + final byte version = SubscriptionWrapper.VERSION_0; final SubscriptionWrapper wrapper = new SubscriptionWrapper<>( hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, @@ -189,7 +189,7 @@ public class SubscriptionWrapperSerdeTest { assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, - (byte) 0, + SubscriptionWrapper.VERSION_0, primaryPartition)); } @@ -201,7 +201,7 @@ public class SubscriptionWrapperSerdeTest { assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, - (byte) 1, + SubscriptionWrapper.VERSION_1, primaryPartition)); } @@ -214,7 +214,7 @@ public class SubscriptionWrapperSerdeTest { hashedValue, null, originalKey, - (byte) 0, + SubscriptionWrapper.VERSION_0, primaryPartition)); } @@ -227,7 +227,7 @@ public class SubscriptionWrapperSerdeTest { hashedValue, null, originalKey, - (byte) 0, + SubscriptionWrapper.VERSION_0, primaryPartition)); } @@ -241,7 +241,7 @@ public class SubscriptionWrapperSerdeTest { hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, - (byte) 1, + SubscriptionWrapper.VERSION_1, primaryPartition); assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper)); }
