vcrfxia commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1239143225
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + builder = new StreamsBuilder(); + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table("tableTopic2", consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldDelayJoinByGracePeriod() { + makeJoin(Duration.ofMillis(2)); + + // push four items to the table. this should not produce any item. + pushToTableNonRandom(4, "Y"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1)); + + // push all items to the table. this should not produce any item + pushToTableNonRandom(4, "YY"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( Review Comment: The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this test which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org