cadonna commented on code in PR #13942: URL: https://github.com/apache/kafka/pull/13942#discussion_r1262545184
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java: ########## @@ -31,17 +32,22 @@ public class StreamTableJoinNode<K, V> extends GraphNode { private final String[] storeNames; private final ProcessorParameters<K, V, ?, ?> processorParameters; private final String otherJoinSideNodeName; + private final Duration gracePeriod; + public StreamTableJoinNode(final String nodeName, final ProcessorParameters<K, V, ?, ?> processorParameters, final String[] storeNames, - final String otherJoinSideNodeName) { + final String otherJoinSideNodeName, + final Duration gracePeriod) { super(nodeName); // in the case of Stream-Table join the state stores associated with the KTable this.storeNames = storeNames; this.processorParameters = processorParameters; this.otherJoinSideNodeName = otherJoinSideNodeName; + this.gracePeriod = gracePeriod; + Review Comment: nit: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreTest.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.WriteBatch; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +class RocksDBTimeOrderedKeyValueBytesStoreTest { + + private InternalMockProcessorContext context; + private RocksDBTimeOrderedKeyValueBytesStore bytesStore; + private File stateDir; + final String storeName = "bytes-store"; + private final static String METRICS_SCOPE = "metrics-scope"; + private final String topic = "changelog"; + + + @BeforeEach + public void before() { + bytesStore = new RocksDBTimeOrderedKeyValueBytesStore(storeName, METRICS_SCOPE); + + stateDir = TestUtils.tempDirectory(); + context = new InternalMockProcessorContext<>( + stateDir, + Serdes.String(), + Serdes.Long(), + new MockRecordCollector(), + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); + bytesStore.init((StateStoreContext) context, bytesStore); + } + + @AfterEach + public void close() { + bytesStore.close(); + } + + @Test + public void shouldCreateWriteBatches() { + final String key = "a"; + final Collection<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + records.add(new ConsumerRecord<>("", 0, 0L, serializeKey(key, 0, 0L).get(), serializeValue(50L))); + records.add(new ConsumerRecord<>("", 0, 0L, serializeKey(key, 1, 1L).get(), serializeValue(100L))); + final Map<KeyValueSegment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records); + assertEquals(1, writeBatchMap.size()); + + for (final WriteBatch batch : writeBatchMap.values()) { + // 2 includes base and index record Review Comment: This comment lies. There is no index record here. Please remove. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreTest.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.WriteBatch; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +class RocksDBTimeOrderedKeyValueBytesStoreTest { + + private InternalMockProcessorContext context; + private RocksDBTimeOrderedKeyValueBytesStore bytesStore; + private File stateDir; + final String storeName = "bytes-store"; + private final static String METRICS_SCOPE = "metrics-scope"; + private final String topic = "changelog"; + + + @BeforeEach + public void before() { + bytesStore = new RocksDBTimeOrderedKeyValueBytesStore(storeName, METRICS_SCOPE); + + stateDir = TestUtils.tempDirectory(); + context = new InternalMockProcessorContext<>( + stateDir, + Serdes.String(), + Serdes.Long(), + new MockRecordCollector(), + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); + bytesStore.init((StateStoreContext) context, bytesStore); + } + + @AfterEach + public void close() { + bytesStore.close(); + } + + @Test + public void shouldCreateWriteBatches() { Review Comment: Could you please add a test for the edge case when the the collection of records is empty? ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -164,6 +164,75 @@ public void shouldFailIfTableIsNotVersioned() { ); } + @Test + public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + // the filter operation forces the table materialization to be inherited + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + // the filter operation forces the table materialization to be inherited + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + //should not throw an error + builder.build(); + } + + @Test + public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMinutes(6))).to("out-one"); + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> builder.build(props)); Review Comment: Could you please structure your tests in such a way that it is clearer what the setup, the actual call under test and the verification is. I usually separate these three parts by an empty line. A clear separation is not always possible, though. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -164,6 +164,75 @@ public void shouldFailIfTableIsNotVersioned() { ); } + @Test + public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + // the filter operation forces the table materialization to be inherited + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + // the filter operation forces the table materialization to be inherited + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + //should not throw an error + builder.build(); + } + + @Test + public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMinutes(6))).to("out-one"); + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> builder.build(props)); + assertThat(exception.getMessage(), is("History retention must be at least grace period.")); + } + + @Test + public void shouldFailIfGracePeriodIsLongerThanHistoryRetentionAndInheritedStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(0)))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + // the filter operation forces the table materialization to be inherited + + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> builder.build(props)); Review Comment: I was wondering if it would not be better testing this exceptions in `StreamTableJoinNodeTest` (does not yet exist). In such a way, the tests might become simpler, because we could verify the exceptions just for the case that a versioned state store is connected to the node in `StreamTableJoinNodeTest`. Here we would merely test if the materializations are correctly inherited (maybe there are already such tests somewhere). You do not need to do that for this PR. But we should try to avoid writing more tests than necessary and at the same time we should cover all cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org