cadonna commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1262545184


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java:
##########
@@ -31,17 +32,22 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
     private final String[] storeNames;
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
     private final String otherJoinSideNodeName;
+    private final Duration gracePeriod;
+
 
     public StreamTableJoinNode(final String nodeName,
                                final ProcessorParameters<K, V, ?, ?> 
processorParameters,
                                final String[] storeNames,
-                               final String otherJoinSideNodeName) {
+                               final String otherJoinSideNodeName,
+                               final Duration gracePeriod) {
         super(nodeName);
 
         // in the case of Stream-Table join the state stores associated with 
the KTable
         this.storeNames = storeNames;
         this.processorParameters = processorParameters;
         this.otherJoinSideNodeName = otherJoinSideNodeName;
+        this.gracePeriod = gracePeriod;
+

Review Comment:
   nit:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.WriteBatch;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+class RocksDBTimeOrderedKeyValueBytesStoreTest {
+
+    private InternalMockProcessorContext context;
+    private RocksDBTimeOrderedKeyValueBytesStore bytesStore;
+    private File stateDir;
+    final String storeName = "bytes-store";
+    private final static String METRICS_SCOPE = "metrics-scope";
+    private final String topic = "changelog";
+
+
+    @BeforeEach
+    public void before() {
+        bytesStore = new RocksDBTimeOrderedKeyValueBytesStore(storeName, 
METRICS_SCOPE);
+
+        stateDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            stateDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
+        );
+        bytesStore.init((StateStoreContext) context, bytesStore);
+    }
+
+    @AfterEach
+    public void close() {
+        bytesStore.close();
+    }
+
+    @Test
+    public void shouldCreateWriteBatches() {
+        final String key = "a";
+        final Collection<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>();
+        records.add(new ConsumerRecord<>("", 0, 0L, serializeKey(key, 0, 
0L).get(), serializeValue(50L)));
+        records.add(new ConsumerRecord<>("", 0, 0L, serializeKey(key, 1, 
1L).get(), serializeValue(100L)));
+        final Map<KeyValueSegment, WriteBatch> writeBatchMap = 
bytesStore.getWriteBatches(records);
+        assertEquals(1, writeBatchMap.size());
+
+        for (final WriteBatch batch : writeBatchMap.values()) {
+            // 2 includes base and index record

Review Comment:
   This comment lies. There is no index record here. Please remove.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.WriteBatch;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+class RocksDBTimeOrderedKeyValueBytesStoreTest {
+
+    private InternalMockProcessorContext context;
+    private RocksDBTimeOrderedKeyValueBytesStore bytesStore;
+    private File stateDir;
+    final String storeName = "bytes-store";
+    private final static String METRICS_SCOPE = "metrics-scope";
+    private final String topic = "changelog";
+
+
+    @BeforeEach
+    public void before() {
+        bytesStore = new RocksDBTimeOrderedKeyValueBytesStore(storeName, 
METRICS_SCOPE);
+
+        stateDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            stateDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
+        );
+        bytesStore.init((StateStoreContext) context, bytesStore);
+    }
+
+    @AfterEach
+    public void close() {
+        bytesStore.close();
+    }
+
+    @Test
+    public void shouldCreateWriteBatches() {

Review Comment:
   Could you please add a test for the edge case when the the collection of 
records is empty?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -164,6 +164,75 @@ public void shouldFailIfTableIsNotVersioned() {
         );
     }
 
+    @Test
+    public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        // the filter operation forces the table materialization to be 
inherited
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        // the filter operation forces the table materialization to be 
inherited
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        //should not throw an error
+        builder.build();
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMinutes(6))).to("out-one");
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> builder.build(props));

Review Comment:
   Could you please structure your tests in such a way that it is clearer what 
the setup, the actual call under test and the verification is. I usually 
separate these three parts by an empty line. A clear separation is not always 
possible, though.  



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -164,6 +164,75 @@ public void shouldFailIfTableIsNotVersioned() {
         );
     }
 
+    @Test
+    public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        // the filter operation forces the table materialization to be 
inherited
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        // the filter operation forces the table materialization to be 
inherited
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        //should not throw an error
+        builder.build();
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMinutes(6))).to("out-one");
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> builder.build(props));
+        assertThat(exception.getMessage(), is("History retention must be at 
least grace period."));
+    }
+
+    @Test
+    public void 
shouldFailIfGracePeriodIsLongerThanHistoryRetentionAndInheritedStore() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(0))));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        // the filter operation forces the table materialization to be 
inherited
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> builder.build(props));

Review Comment:
   I was wondering if it would not be better testing this exceptions in 
`StreamTableJoinNodeTest` (does not yet exist). In such a way, the tests might 
become simpler, because we could verify the exceptions just for the case that a 
versioned state store is connected to the node in `StreamTableJoinNodeTest`. 
Here we would merely test if the materializations are correctly inherited 
(maybe there are already such tests somewhere).
   You do not need to do that for this PR. 
   But we should try to avoid writing more tests than necessary and at the same 
time we should cover all cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to