This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1d5d003ff48 KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522) 1d5d003ff48 is described below commit 1d5d003ff48097c17464ebadee58182114ee1a7f Author: Victoria Xia <victoria....@confluent.io> AuthorDate: Wed Apr 12 22:05:10 2023 -0400 KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522) This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914. Reviewers: Matthias J. Sax <matth...@confluent.io> --- checkstyle/suppressions.xml | 6 +- .../streams/kstream/internals/KTableImpl.java | 22 +- .../ForeignJoinSubscriptionProcessorSupplier.java | 36 ++- ...reignJoinSubscriptionSendProcessorSupplier.java | 93 ++++--- .../KTableKTableForeignKeyJoinIntegrationTest.java | 219 +++++++++------ ...ableForeignKeyVersionedJoinIntegrationTest.java | 301 +++++++++++++++++++++ .../streams/kstream/internals/KStreamImplTest.java | 4 +- 7 files changed, 534 insertions(+), 147 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index d3e458082f4..28cefdd30c2 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -230,19 +230,19 @@ files=".*[/\\]streams[/\\].*test[/\\].*.java"/> <suppress checks="CyclomaticComplexity" - files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> + files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> <suppress checks="JavaNCSS" files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/> <suppress checks="NPathComplexity" - files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/> + files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/> <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)" files="Murmur3Test.java"/> <suppress checks="MethodLength" - files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest).java"/> + files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KTableKTableForeignKeyVersionedJoinIntegrationTest).java"/> <suppress checks="ClassFanOutComplexity" files="StreamTaskTest.java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index cd8c5abc280..82438ff59a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -1116,7 +1116,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< keySerde ); - final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>( + final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier(); + final StatefulProcessorNode<K, Change<V>> subscriptionNode = new StatefulProcessorNode<>( new ProcessorParameters<>( new ForeignJoinSubscriptionSendProcessorSupplier<>( foreignKeyExtractor, @@ -1124,10 +1125,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< valueHashSerdePseudoTopic, foreignKeySerde, valueSerde == null ? null : valueSerde.serializer(), - leftJoin + leftJoin, + primaryKeyValueGetter ), renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION) - ) + ), + Collections.emptySet(), + Collections.singleton(primaryKeyValueGetter) ); builder.addGraphNode(graphNode, subscriptionNode); @@ -1179,26 +1183,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< ); builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); + final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier(); final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode = new StatefulProcessorNode<>( new ProcessorParameters<>( new SubscriptionJoinForeignProcessorSupplier<>( - ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier() + foreignKeyValueGetter ), renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR) ), Collections.emptySet(), - Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()) + Collections.singleton(foreignKeyValueGetter) ); builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode); - final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>( + final StatefulProcessorNode<KO, Change<VO>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>( new ProcessorParameters<>( - new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema), + new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter), renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR) ), Collections.singleton(subscriptionStore), - Collections.emptySet() + Collections.singleton(foreignKeyValueGetter) ); builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignJoinSubscriptionNode); @@ -1232,7 +1237,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< resultSourceNodes.add(foreignResponseSource.nodeName()); builder.internalTopologyBuilder.copartitionSources(resultSourceNodes); - final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier(); final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>( primaryKeyValueGetter, valueSerde == null ? null : valueSerde.serializer(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java index 55e40fce64f..46e2bd24c25 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.KTableValueGetter; +import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -43,24 +45,32 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class); private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder; private final CombinedKeySchema<KO, K> keySchema; + private final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier; public ForeignJoinSubscriptionProcessorSupplier( final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, - final CombinedKeySchema<KO, K> keySchema) { + final CombinedKeySchema<KO, K> keySchema, + final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier) { this.storeBuilder = storeBuilder; this.keySchema = keySchema; + this.foreignKeyValueGetterSupplier = foreignKeyValueGetterSupplier; } @Override public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() { - return new KTableKTableJoinProcessor(); + return new KTableKTableJoinProcessor(foreignKeyValueGetterSupplier.get()); } private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> { private Sensor droppedRecordsSensor; - private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store; + private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore; + private final KTableValueGetter<KO, VO> foreignKeyValueGetter; + + private KTableKTableJoinProcessor(final KTableValueGetter<KO, VO> foreignKeyValueGetter) { + this.foreignKeyValueGetter = foreignKeyValueGetter; + } @Override public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) { @@ -71,7 +81,8 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements internalProcessorContext.taskId().toString(), internalProcessorContext.metrics() ); - store = internalProcessorContext.getStateStore(storeBuilder); + subscriptionStore = internalProcessorContext.getStateStore(storeBuilder); + foreignKeyValueGetter.init(context); } @Override @@ -95,11 +106,21 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements return; } + // drop out-of-order records from versioned tables (cf. KIP-914) + if (foreignKeyValueGetter.isVersioned()) { + final ValueAndTimestamp<VO> latestValueAndTimestamp = foreignKeyValueGetter.get(record.key()); + if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) { + LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); + droppedRecordsSensor.record(); + return; + } + } + final Bytes prefixBytes = keySchema.prefixBytes(record.key()); //Perform the prefixScan and propagate the results try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults = - store.range(prefixBytes, Bytes.increment(prefixBytes))) { + subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) { while (prefixScanResults.hasNext()) { final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next(); @@ -118,6 +139,11 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements } } + @Override + public void close() { + foreignKeyValueGetter.close(); + } + private boolean prefixEquals(final byte[] x, final byte[] y) { final int min = Math.min(x.length, y.length); final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java index 0efe4da2bcb..3d8b5dd222e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.KTableValueGetter; +import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -29,6 +31,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +52,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P private final Supplier<String> foreignKeySerdeTopicSupplier; private final Supplier<String> valueSerdeTopicSupplier; private final boolean leftJoin; + private final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier; private Serializer<KO> foreignKeySerializer; private Serializer<V> valueSerializer; @@ -57,18 +61,20 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P final Supplier<String> valueSerdeTopicSupplier, final Serde<KO> foreignKeySerde, final Serializer<V> valueSerializer, - final boolean leftJoin) { + final boolean leftJoin, + final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) { this.foreignKeyExtractor = foreignKeyExtractor; this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier; this.valueSerdeTopicSupplier = valueSerdeTopicSupplier; this.valueSerializer = valueSerializer; this.leftJoin = leftJoin; + this.primaryKeyValueGetterSupplier = primaryKeyValueGetterSupplier; foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); } @Override public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() { - return new UnbindChangeProcessor(); + return new UnbindChangeProcessor(primaryKeyValueGetterSupplier.get()); } private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> { @@ -76,6 +82,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P private Sensor droppedRecordsSensor; private String foreignKeySerdeTopic; private String valueSerdeTopic; + private final KTableValueGetter<K, V> primaryKeyValueGetter; + + private UnbindChangeProcessor(final KTableValueGetter<K, V> primaryKeyValueGetter) { + this.primaryKeyValueGetter = primaryKeyValueGetter; + } @SuppressWarnings("unchecked") @Override @@ -95,10 +106,25 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P context.taskId().toString(), (StreamsMetricsImpl) context.metrics() ); + primaryKeyValueGetter.init(context); } @Override public void process(final Record<K, Change<V>> record) { + // drop out-of-order records from versioned tables (cf. KIP-914) + if (primaryKeyValueGetter.isVersioned()) { + // key-value stores do not contain data for null keys, so skip the check + // if the key is null + if (record.key() != null) { + final ValueAndTimestamp<V> latestValueAndTimestamp = primaryKeyValueGetter.get(record.key()); + if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) { + LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); + droppedRecordsSensor.record(); + return; + } + } + } + final long[] currentHash = record.value().newValue == null ? null : Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); @@ -107,37 +133,13 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record due to null foreign key. " - + "topic=[{}] partition=[{}] offset=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() - ); - } else { - LOG.warn( - "Skipping record due to null foreign key. Topic, partition, and offset not known." - ); - } - droppedRecordsSensor.record(); + logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record due to null foreign key. " - + "topic=[{}] partition=[{}] offset=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() - ); - } else { - LOG.warn( - "Skipping record due to null foreign key. Topic, partition, and offset not known." - ); - } - droppedRecordsSensor.record(); + logSkippedRecordDueToNullForeignKey(); return; } @@ -193,19 +195,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P } final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record due to null foreign key. " - + "topic=[{}] partition=[{}] offset=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() - ); - } else { - LOG.warn( - "Skipping record due to null foreign key. Topic, partition, and offset not known." - ); - } - droppedRecordsSensor.record(); + logSkippedRecordDueToNullForeignKey(); } else { context().forward( record.withKey(newForeignKey) @@ -217,5 +207,26 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P } } } + + @Override + public void close() { + primaryKeyValueGetter.close(); + } + + private void logSkippedRecordDueToNullForeignKey() { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null foreign key. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null foreign key. Topic, partition, and offset not known." + ); + } + droppedRecordsSensor.record(); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 931aaf8e53e..04b124a726b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.integration; +import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; @@ -66,25 +68,45 @@ import static org.hamcrest.MatcherAssert.assertThat; public class KTableKTableForeignKeyJoinIntegrationTest { @Rule public Timeout globalTimeout = Timeout.seconds(600); - private static final String LEFT_TABLE = "left_table"; - private static final String RIGHT_TABLE = "right_table"; - private static final String OUTPUT = "output-topic"; + protected static final String LEFT_TABLE = "left_table"; + protected static final String RIGHT_TABLE = "right_table"; + protected static final String OUTPUT = "output-topic"; private static final String REJOIN_OUTPUT = "rejoin-output-topic"; - private final boolean leftJoin; - private final boolean materialized; + + private final MockTime time = new MockTime(); + + protected final boolean leftJoin; + protected final boolean materialized; private final String optimization; - private final boolean rejoin; + protected final boolean rejoin; + protected final boolean leftVersioned; + protected final boolean rightVersioned; - private Properties streamsConfig; + protected Properties streamsConfig; + protected long baseTimestamp; public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, final String optimization, final boolean materialized, final boolean rejoin) { + // versioning is disabled for these tests, even though the code supports building a + // topology with versioned tables, since KTableKTableForeignKeyVersionedJoinIntegrationTest + // extends this test class. + this(leftJoin, optimization, materialized, rejoin, false, false); + } + + protected KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, + final String optimization, + final boolean materialized, + final boolean rejoin, + final boolean leftVersioned, + final boolean rightVersioned) { this.rejoin = rejoin; this.leftJoin = leftJoin; this.materialized = materialized; this.optimization = optimization; + this.leftVersioned = leftVersioned; + this.rightVersioned = rightVersioned; } @Rule @@ -96,6 +118,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization) )); + baseTimestamp = time.milliseconds(); } @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}") @@ -105,7 +128,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { return buildParameters(booleans, optimizations, booleans, booleans); } - private static Collection<Object[]> buildParameters(final List<?>... argOptions) { + protected static Collection<Object[]> buildParameters(final List<?>... argOptions) { List<Object[]> result = new LinkedList<>(); result.add(new Object[0]); @@ -131,7 +154,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void doJoinFromLeftThenDeleteLeftEntity() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); @@ -140,9 +163,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records - right.pipeInput("rhs1", "rhsValue1"); - right.pipeInput("rhs2", "rhsValue2"); - right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results + right.pipeInput("rhs1", "rhsValue1", baseTimestamp); + right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1); + right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this unreferenced FK won't show up in any results assertThat( outputTopic.readKeyValuesToMap(), @@ -161,8 +184,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest { ); } - left.pipeInput("lhs1", "lhsValue1|rhs1"); - left.pipeInput("lhs2", "lhsValue2|rhs2"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3); + left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4); { final Map<String, String> expected = mkMap( @@ -191,7 +214,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Add another reference to an existing FK - left.pipeInput("lhs3", "lhsValue3|rhs1"); + left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5); { assertThat( outputTopic.readKeyValuesToMap(), @@ -220,7 +243,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Now delete one LHS entity such that one delete is propagated down to the output. - left.pipeInput("lhs1", (String) null); + left.pipeInput("lhs1", (String) null, baseTimestamp + 6); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap( @@ -249,7 +272,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void doJoinFromRightThenDeleteRightEntity() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); @@ -257,54 +280,54 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records - left.pipeInput("lhs1", "lhsValue1|rhs1"); - left.pipeInput("lhs2", "lhsValue2|rhs2"); - left.pipeInput("lhs3", "lhsValue3|rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); + left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1); + left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2); assertThat( outputTopic.readKeyValuesToMap(), is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), - mkEntry("lhs2", "(lhsValue2|rhs2,null)"), - mkEntry("lhs3", "(lhsValue3|rhs1,null)")) - : emptyMap() + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), + mkEntry("lhs2", "(lhsValue2|rhs2,null)"), + mkEntry("lhs3", "(lhsValue3|rhs1,null)")) + : emptyMap() ) ); if (materialized) { assertThat( asMap(store), is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), - mkEntry("lhs2", "(lhsValue2|rhs2,null)"), - mkEntry("lhs3", "(lhsValue3|rhs1,null)")) - : emptyMap() + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), + mkEntry("lhs2", "(lhsValue2|rhs2,null)"), + mkEntry("lhs3", "(lhsValue3|rhs1,null)")) + : emptyMap() ) ); } - right.pipeInput("rhs1", "rhsValue1"); + right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) ) ); if (materialized) { assertThat( asMap(store), is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,null)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs2,null)"), + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) - : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) + : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) ) ); } - right.pipeInput("rhs2", "rhsValue2"); + right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 4); assertThat( outputTopic.readKeyValuesToMap(), @@ -314,13 +337,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest { assertThat( asMap(store), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) + mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) ) ); } - right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results + right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 5); // this unreferenced FK won't show up in any results assertThat( outputTopic.readKeyValuesToMap(), @@ -330,30 +353,30 @@ public class KTableKTableForeignKeyJoinIntegrationTest { assertThat( asMap(store), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) + mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) ) ); } // Now delete the RHS entity such that all matching keys have deletes propagated. - right.pipeInput("rhs1", (String) null); + right.pipeInput("rhs1", (String) null, baseTimestamp + 6); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs1,null)" : null), - mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null)) + mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null)) ) ); if (materialized) { assertThat( asMap(store), is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,null)")) + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), + mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + mkEntry("lhs3", "(lhsValue3|rhs1,null)")) - : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")) + : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")) ) ); } @@ -362,13 +385,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); - left.pipeInput("lhs1", "lhsValue1|rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); { final Map<String, String> expected = @@ -388,7 +411,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. // For the left join, the tombstone is necessary. - left.pipeInput("lhs1", (String) null); + left.pipeInput("lhs1", (String) null, baseTimestamp + 1); { assertThat( outputTopic.readKeyValuesToMap(), @@ -403,7 +426,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Deleting a non-existing record is idempotent - left.pipeInput("lhs1", (String) null); + left.pipeInput("lhs1", (String) null, baseTimestamp + 2); { assertThat( outputTopic.readKeyValuesToMap(), @@ -421,14 +444,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Deleting a record that never existed doesn't need to emit tombstones. - left.pipeInput("lhs1", (String) null); + left.pipeInput("lhs1", (String) null, baseTimestamp); { assertThat( outputTopic.readKeyValuesToMap(), @@ -446,14 +469,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); - left.pipeInput("lhs1", "lhsValue1|rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); // no output for a new inner join on a non-existent FK // the left join of course emits the half-joined output assertThat( @@ -470,7 +493,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // since it impossible to know whether the prior FK existed or not (and thus whether any results have // previously been emitted) // The left join emits a _necessary_ update (since the lhs record has actually changed) - left.pipeInput("lhs1", "lhsValue1|rhs2"); + left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 1); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) @@ -482,7 +505,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { ); } // of course, moving it again to yet another non-existent FK has the same effect - left.pipeInput("lhs1", "lhsValue1|rhs3"); + left.pipeInput("lhs1", "lhsValue1|rhs3", baseTimestamp + 2); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) @@ -497,7 +520,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one // This RHS key was previously referenced, but it's not referenced now, so adding this record should // result in no changes whatsoever. - right.pipeInput("rhs1", "rhsValue1"); + right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) @@ -510,7 +533,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // now, we change to a FK that exists, and see the join completes - left.pipeInput("lhs1", "lhsValue1|rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 4); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap( @@ -528,7 +551,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the // left join updates appropriately. - left.pipeInput("lhs1", "lhsValue1|rhs2"); + left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap( @@ -546,7 +569,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { @Test public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() { - final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); @@ -555,8 +578,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS - right.pipeInput("rhs1", "rhsValue1"); - right.pipeInput("rhs2", "rhsValue2"); + right.pipeInput("rhs1", "rhsValue1", baseTimestamp); + right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1); assertThat( outputTopic.readKeyValuesToMap(), @@ -569,7 +592,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { ); } - left.pipeInput("lhs1", "lhsValue1|rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") @@ -587,7 +610,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Change LHS foreign key reference - left.pipeInput("lhs1", "lhsValue1|rhs2"); + left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 3); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") @@ -605,7 +628,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Populate RHS update on old LHS foreign key ref - right.pipeInput("rhs1", "rhsValue1Delta"); + right.pipeInput("rhs1", "rhsValue1Delta", baseTimestamp + 4); { assertThat( outputTopic.readKeyValuesToMap(), @@ -623,29 +646,52 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } } - private static Map<String, String> asMap(final KeyValueStore<String, String> store) { + protected static Map<String, String> asMap(final KeyValueStore<String, String> store) { final HashMap<String, String> result = new HashMap<>(); store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); return result; } - private static Topology getTopology(final Properties streamsConfig, - final String queryableStoreName, - final boolean leftJoin, - final boolean rejoin) { + protected static Topology getTopology(final Properties streamsConfig, + final String queryableStoreName, + final boolean leftJoin, + final boolean rejoin, + final boolean leftVersioned, + final boolean rightVersioned) { final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); - final KTable<String, String> left = builder.table( - LEFT_TABLE, - Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), - serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) - ); - final KTable<String, String> right = builder.table( - RIGHT_TABLE, - Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), - serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) - ); + final KTable<String, String> left; + if (leftVersioned) { + left = builder.table( + LEFT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)), + Materialized.as(Stores.persistentVersionedKeyValueStore("left", Duration.ofMinutes(5))) + ); + } else { + left = builder.table( + LEFT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + ); + } + + final KTable<String, String> right; + if (rightVersioned) { + right = builder.table( + RIGHT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)), + Materialized.as(Stores.persistentVersionedKeyValueStore("right", Duration.ofMinutes(5))) + ); + } else { + right = builder.table( + RIGHT_TABLE, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + ); + } final Function<String, String> extractor = value -> value.split("\\|")[1]; final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; @@ -678,13 +724,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest { left.leftJoin(right, extractor, joiner, mainMaterialized); fkJoin.toStream() - .to(OUTPUT); + .to(OUTPUT); // also make sure the FK join is set up right for downstream operations that require materialization if (rejoin) { fkJoin.leftJoin(left, rejoiner, rejoinMaterialized) - .toStream() - .to(REJOIN_OUTPUT); + .toStream() + .to(REJOIN_OUTPUT); } } else { final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized); @@ -696,12 +742,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // also make sure the FK join is set up right for downstream operations that require materialization if (rejoin) { fkJoin.join(left, rejoiner, rejoinMaterialized) - .toStream() - .to(REJOIN_OUTPUT); + .toStream() + .to(REJOIN_OUTPUT); } } - return builder.build(streamsConfig); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java new file mode 100644 index 00000000000..10f632ffdea --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category(IntegrationTest.class) +public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKTableForeignKeyJoinIntegrationTest { + + public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean leftJoin, + final boolean materialized, + final boolean leftVersioned, + final boolean rightVersioned) { + // optimizations and rejoin are disabled for these tests, as these tests focus on versioning. + // see KTableKTableForeignKeyJoinIntegrationTest for test coverage for optimizations and rejoin + super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, leftVersioned, rightVersioned); + } + + @Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, leftVersioned={2}, rightVersioned={3}") + public static Collection<Object[]> data() { + final List<Boolean> booleans = Arrays.asList(true, false); + return buildParameters(booleans, booleans, booleans, booleans); + } + + @Test + public void shouldIgnoreOutOfOrderRecordsIffVersioned() { + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); + final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); + final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); + final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + + // RHS record + right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4); + + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized) { + assertThat( + asMap(store), + is(emptyMap()) + ); + } + + // LHS records with match to existing RHS record + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3); + left.pipeInput("lhs2", "lhsValue2|rhs1", baseTimestamp + 5); + + { + final Map<String, String> expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + ); + assertThat( + outputTopic.readKeyValuesToMap(), + is(expected) + ); + if (materialized) { + assertThat( + asMap(store), + is(expected) + ); + } + } + + // out-of-order LHS record (for existing key) does not produce a new result iff LHS is versioned + left.pipeInput("lhs1", "lhsValue1_ooo|rhs1", baseTimestamp + 2); + if (leftVersioned) { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } else { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)") + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } + + // out-of-order LHS tombstone (for existing key) is similarly ignored (iff LHS is versioned) + left.pipeInput("lhs1", null, baseTimestamp + 2); + if (leftVersioned) { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } else { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", null) + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } + + // LHS record with larger timestamp always produces a new result + left.pipeInput("lhs1", "lhsValue1_new|rhs1", baseTimestamp + 8); + { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)") + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } + + // out-of-order RHS record (for existing key) does not produce a new result iff RHS is versioned + right.pipeInput("rhs1", "rhsValue1_ooo", baseTimestamp + 1); + if (rightVersioned) { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } else { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)") + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)") + )) + ); + } + } + + // out-of-order RHS tombstone (for existing key) is similarly ignored (iff RHS is versioned) + right.pipeInput("rhs1", null, baseTimestamp + 1); + if (rightVersioned) { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)") + )) + ); + } + } else { + if (leftJoin) { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"), + mkEntry("lhs2", "(lhsValue2|rhs1,null)") + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"), + mkEntry("lhs2", "(lhsValue2|rhs1,null)") + )) + ); + } + } else { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", null), + mkEntry("lhs2", null) + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(emptyMap()) + ); + } + } + } + + // RHS record with larger timestamps always produces new results + right.pipeInput("rhs1", "rhsValue1_new", baseTimestamp + 6); + { + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)") + )) + ); + if (materialized) { + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"), + mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)") + )) + ); + } + } + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 76ec17c63b7..54bb14aa0ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -3149,7 +3149,7 @@ public class KStreamImplTest { " Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" + " --> KTABLE-TOSTREAM-0000000020\n" + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" + - " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" + " --> KTABLE-SINK-0000000008\n" + " <-- KSTREAM-TOTABLE-0000000001\n" + " Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" + @@ -3174,7 +3174,7 @@ public class KStreamImplTest { " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" + " --> KTABLE-SINK-0000000015\n" + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" + - " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" + " --> KTABLE-SINK-0000000015\n" + " <-- KSTREAM-TOTABLE-0000000004\n" + " Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" +