This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new 8a81b17 HOTIFX: Disable spurious left/outer stream-stream join fix (#11233) 8a81b17 is described below commit 8a81b175d9dd694e82804fc77dacff86531af1e7 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon Aug 23 09:45:49 2021 -0700 HOTIFX: Disable spurious left/outer stream-stream join fix (#11233) KAFKA-10847 improves stream-stream left/outer joins to avoid spurious left/outer join results. However, it introduces regression bug KAFKA-13216. This PR disables KAFKA-10847 by partially rolling back KIP-633 changes. --- .../apache/kafka/streams/kstream/JoinWindows.java | 46 ----- .../apache/kafka/streams/StreamsBuilderTest.java | 20 +- .../org/apache/kafka/streams/TopologyTest.java | 36 ++-- .../StreamStreamJoinIntegrationTest.java | 36 ++-- .../kafka/streams/kstream/JoinWindowsTest.java | 22 -- .../KStreamImplValueJoinerWithKeyTest.java | 9 +- .../kstream/internals/KStreamKStreamJoinTest.java | 15 +- .../internals/KStreamKStreamLeftJoinTest.java | 151 ++++++-------- .../internals/KStreamKStreamOuterJoinTest.java | 223 ++++++++------------- .../apache/kafka/streams/scala/TopologyTest.scala | 8 +- .../kafka/streams/scala/kstream/KStreamTest.scala | 2 +- 11 files changed, 214 insertions(+), 354 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 1b26bcc..f353739 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -104,55 +104,11 @@ public class JoinWindows extends Windows<Window> { * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after * the timestamp of the record from the primary stream. - * <p> - * Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which - * means that only out-of-order records arriving more than the grace period after the window end will be dropped. - * The window close, after which any incoming records are considered late and will be rejected, is defined as - * {@code windowEnd + afterWindowEnd} - * - * @param timeDifference join window interval - * @param afterWindowEnd The grace period to admit out-of-order events to a window. - * @return A new JoinWindows object with the specified window definition and grace period - * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} - * if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} - */ - public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) { - final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); - final long timeDifferenceMs = validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix); - - final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); - final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix); - - return new JoinWindows(timeDifferenceMs, timeDifferenceMs, afterWindowEndMs, true); - } - - /** - * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, - * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after - * the timestamp of the record from the primary stream. - * <p> - * CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order - * records arriving after the window ends are considered late and will be dropped. - * - * @param timeDifference join window interval - * @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped - * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} - */ - public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) { - return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD)); - } - - /** - * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, - * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after - * the timestamp of the record from the primary stream. * * @param timeDifference join window interval * @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference}) * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} - * @deprecated since 3.0. Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead */ - @Deprecated public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix); @@ -216,9 +172,7 @@ public class JoinWindows extends Windows<Window> { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} - * @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead */ - @Deprecated public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { //TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace() final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index e18b972..2f0dedd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -745,7 +745,7 @@ public class StreamsBuilderTest { streamOne.leftJoin( streamTwo, (value1, value2) -> value1, - JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), + JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO), StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME) .withName(STREAM_OPERATION_NAME) ); @@ -754,8 +754,7 @@ public class StreamsBuilderTest { final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); assertNamesForStateStore(topology.stateStores(), STREAM_OPERATION_NAME + "-this-join-store", - STREAM_OPERATION_NAME + "-outer-other-join-store", - STREAM_OPERATION_NAME + "-left-shared-join-store" + STREAM_OPERATION_NAME + "-outer-other-join-store" ); assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", @@ -775,7 +774,7 @@ public class StreamsBuilderTest { streamOne.leftJoin( streamTwo, (value1, value2) -> value1, - JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), + JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) .withName(STREAM_OPERATION_NAME) ); @@ -784,8 +783,7 @@ public class StreamsBuilderTest { final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); assertNamesForStateStore(topology.stateStores(), "KSTREAM-JOINTHIS-0000000004-store", - "KSTREAM-OUTEROTHER-0000000005-store", - "KSTREAM-OUTERSHARED-0000000004-store" + "KSTREAM-OUTEROTHER-0000000005-store" ); assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", @@ -851,7 +849,7 @@ public class StreamsBuilderTest { streamOne.outerJoin( streamTwo, (value1, value2) -> value1, - JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), + JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO), StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME) .withName(STREAM_OPERATION_NAME) ); @@ -859,8 +857,7 @@ public class StreamsBuilderTest { final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); assertNamesForStateStore(topology.stateStores(), STREAM_OPERATION_NAME + "-outer-this-join-store", - STREAM_OPERATION_NAME + "-outer-other-join-store", - STREAM_OPERATION_NAME + "-outer-shared-join-store"); + STREAM_OPERATION_NAME + "-outer-other-join-store"); assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", @@ -880,7 +877,7 @@ public class StreamsBuilderTest { streamOne.outerJoin( streamTwo, (value1, value2) -> value1, - JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), + JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) .withName(STREAM_OPERATION_NAME) ); @@ -889,8 +886,7 @@ public class StreamsBuilderTest { final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); assertNamesForStateStore(topology.stateStores(), "KSTREAM-OUTERTHIS-0000000004-store", - "KSTREAM-OUTEROTHER-0000000005-store", - "KSTREAM-OUTERSHARED-0000000004-store" + "KSTREAM-OUTEROTHER-0000000005-store" ); assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index b332f6c..45f7c62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -853,7 +853,7 @@ public class TopologyTest { stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); final TopologyDescription describe = builder.build().describe(); @@ -871,10 +871,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n" + + " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + @@ -895,7 +895,7 @@ public class TopologyTest { stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) .withStoreName("custom-name")); @@ -914,10 +914,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-left-shared-join-store])\n" + + " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-outer-other-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-this-join-store, custom-name-left-shared-join-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-this-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + @@ -935,7 +935,7 @@ public class TopologyTest { stream1 = builder.stream("input-topic1"); stream2 = builder.stream("input-topic2"); - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ZERO); final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store", Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), @@ -968,10 +968,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" + + " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + @@ -992,7 +992,7 @@ public class TopologyTest { stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); final TopologyDescription describe = builder.build().describe(); @@ -1010,10 +1010,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-OUTERTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-OUTERTHIS-0000000004-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + - " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n" + + " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + @@ -1034,7 +1034,7 @@ public class TopologyTest { stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) .withStoreName("custom-name")); @@ -1053,10 +1053,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-outer-this-join-store, custom-name-outer-shared-join-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-outer-this-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + - " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-outer-shared-join-store])\n" + + " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [custom-name-outer-other-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + @@ -1074,7 +1074,7 @@ public class TopologyTest { stream1 = builder.stream("input-topic1"); stream2 = builder.stream("input-topic2"); - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ZERO); final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store", Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), @@ -1107,10 +1107,10 @@ public class TopologyTest { " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" + " --> KSTREAM-OUTEROTHER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000001\n" + - " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" + + " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000003\n" + - " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" + + " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other])\n" + " --> KSTREAM-MERGE-0000000006\n" + " <-- KSTREAM-WINDOWED-0000000002\n" + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index 9d2bd1e..ef00743 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -100,7 +100,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest leftStream.join( rightStream, valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -147,7 +147,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -161,7 +161,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L) + ), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L), @@ -192,7 +195,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest leftStream.leftJoin( rightStream, valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -206,7 +209,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L) + ), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L), @@ -239,7 +245,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -253,7 +259,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L) + ), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L), @@ -284,7 +293,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest leftStream.outerJoin( rightStream, valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -298,7 +307,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L) + ), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L), @@ -331,7 +343,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); @@ -424,11 +436,11 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest leftStream.join( rightStream, valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).join( rightStream, valueJoiner, - JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24)) + JoinWindows.of(ofSeconds(10)).grace(ofHours(24)) ).to(OUTPUT_TOPIC); runTestWithDriver(expectedResult); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index 3d38ada..7fb692e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -75,8 +75,6 @@ public class JoinWindowsTest { @Test public void timeDifferenceMustNotBeNegative() { assertThrows(IllegalArgumentException.class, () -> JoinWindows.of(ofMillis(-1))); - assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1))); - assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE))); } @Test @@ -149,16 +147,6 @@ public class JoinWindowsTest { JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)), JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)) ); - - verifyEquality( - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)), - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)) - ); - - verifyEquality( - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)), - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)) - ); } @Test @@ -188,15 +176,5 @@ public class JoinWindowsTest { JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)), JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)) ); - - verifyInEquality( - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)), - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)) - ); - - verifyInEquality( - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)), - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9)) - ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java index b35a7b2..dcd519d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.Arrays; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; @@ -64,7 +65,7 @@ public class KStreamImplValueJoinerWithKeyTest { private final ValueJoinerWithKey<String, Integer, Integer, String> valueJoinerWithKey = (key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv)); - private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofHours(24L)); + private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(ofHours(24L)); private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); private final Joined<String, Integer, Integer> joined = @@ -108,7 +109,8 @@ public class KStreamImplValueJoinerWithKeyTest { ).to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); // Left KV A, 3, Right KV A, 5 // TTD pipes records to left stream first, then right - final List<KeyValue<String, String>> expectedResults = Collections.singletonList(KeyValue.pair("A", "A:5")); + final List<KeyValue<String, String>> expectedResults = + Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A", "A:5")); runJoinTopology( builder, expectedResults, @@ -128,7 +130,8 @@ public class KStreamImplValueJoinerWithKeyTest { // Left KV A, 3, Right KV A, 5 // TTD pipes records to left stream first, then right - final List<KeyValue<String, String>> expectedResults = Collections.singletonList(KeyValue.pair("A", "A:5")); + final List<KeyValue<String, String>> expectedResults = + Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A", "A:5")); runJoinTopology( builder, expectedResults, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 1d50a37..517e0eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -515,7 +515,7 @@ public class KStreamKStreamJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)), + JoinWindows.of(ofMillis(100L)).grace(ofHours(24L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -532,7 +532,7 @@ public class KStreamKStreamJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // push two items to the primary stream; the other window is empty; this should not produce items yet + // push two items to the primary stream; the other window is empty; this should produce 2 spurious items // w1 = {} // w2 = {} // --> w1 = { 0:A0, 1:A1 } @@ -540,7 +540,10 @@ public class KStreamKStreamJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); // push two items to the other stream; this should produce two items // w1 = { 0:A0, 1:A1 } @@ -555,7 +558,7 @@ public class KStreamKStreamJoinTest { new KeyValueTimestamp<>(1, "A1+a1", 0L) ); - // push all four items to the primary stream; this should produce two items + // push all four items to the primary stream; this should produce four items // w1 = { 0:A0, 1:A1 } // w2 = { 0:a0, 1:a1 } // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 } @@ -565,7 +568,9 @@ public class KStreamKStreamJoinTest { } processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "B0+a0", 0L), - new KeyValueTimestamp<>(1, "B1+a1", 0L) + new KeyValueTimestamp<>(1, "B1+a1", 0L), + new KeyValueTimestamp<>(2, "B2+null", 0L), + new KeyValueTimestamp<>(3, "B3+null", 0L) ); // push all items to the other stream; this should produce six items diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 4e2b6d8..5c6b3f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -172,7 +172,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -202,6 +202,8 @@ public class KStreamKStreamLeftJoinTest { inputTopic2.pipeInput(2, "a2", 1001L); processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "A2+null", 1000L), + new KeyValueTimestamp<>(2, "A2-0+null", 1000L), new KeyValueTimestamp<>(2, "A2+a2", 1001L), new KeyValueTimestamp<>(2, "A2-0+a2", 1001L) ); @@ -228,7 +230,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -246,25 +248,16 @@ public class KStreamKStreamLeftJoinTest { inputTopic1.pipeInput(0, "A0", windowStart + 1L); inputTopic1.pipeInput(1, "A1", windowStart + 2L); inputTopic1.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic2.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) - ); - - // Dummy record in left topic will emit expired non-joined records from the left topic - inputTopic1.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L), + new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L), new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L) ); - // Flush internal non-joined state store by joining the dummy record - inputTopic2.pipeInput(2, "dummy", windowStart + 401L); + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L) + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) ); } } @@ -283,7 +276,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -297,29 +290,20 @@ public class KStreamKStreamLeftJoinTest { final long windowStart = 0L; - // No joins detected; No null-joins emitted + // No joins detected; spurious null-joins emitted inputTopic1.pipeInput(0, "A0", windowStart + 1L); inputTopic1.pipeInput(1, "A1", windowStart + 2L); inputTopic1.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic2.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) - ); - - // Dummy record in right topic will emit expired non-joined records from the left topic - inputTopic2.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L), + new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L), new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L) ); - // Flush internal non-joined state store by joining the dummy record - inputTopic1.pipeInput(2, "dummy", windowStart + 402L); + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L) + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) ); } } @@ -338,7 +322,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -363,16 +347,6 @@ public class KStreamKStreamLeftJoinTest { processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L) ); - - // Dummy record in left topic will not emit records - inputTopic1.pipeInput(2, "dummy", windowStart + 401L); - processor.checkAndClearProcessResult(); - - // Process the dummy joined record - inputTopic2.pipeInput(2, "dummy", windowStart + 402L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L) - ); } } @@ -390,7 +364,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)), + JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -430,7 +404,7 @@ public class KStreamKStreamLeftJoinTest { @Test public void testLeftJoinWithInMemoryCustomSuppliers() { - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)); final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store", Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), @@ -447,7 +421,7 @@ public class KStreamKStreamLeftJoinTest { @Test public void testLeftJoinWithDefaultSuppliers() { - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO); final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()); runLeftJoin(streamJoined, joinWindows); @@ -487,8 +461,8 @@ public class KStreamKStreamLeftJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // 2 window stores + 1 shared window store should be available - assertEquals(3, driver.getAllStateStores().size()); + // 2 window stores should be available + assertEquals(2, driver.getAllStateStores().size()); // push two items to the primary stream; the other window is empty // w1 {} @@ -498,7 +472,10 @@ public class KStreamKStreamLeftJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); // push two items to the other stream; this should produce two items // w1 = { 0:A0, 1:A1 } @@ -513,7 +490,7 @@ public class KStreamKStreamLeftJoinTest { new KeyValueTimestamp<>(1, "A1+a1", 0L) ); - // push three items to the primary stream; this should produce two joined items + // push three items to the primary stream; this should produce three joined items // w1 = { 0:A0, 1:A1 } // w2 = { 0:a0, 1:a1 } // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 } @@ -523,7 +500,9 @@ public class KStreamKStreamLeftJoinTest { } processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "B0+a0", 0L), - new KeyValueTimestamp<>(1, "B1+a1", 0L) + new KeyValueTimestamp<>(1, "B1+a1", 0L), + new KeyValueTimestamp<>(2, "B2+null", 0L) + ); // push all items to the other stream; this should produce five items @@ -558,11 +537,6 @@ public class KStreamKStreamLeftJoinTest { new KeyValueTimestamp<>(2, "C2+b2", 0L), new KeyValueTimestamp<>(3, "C3+b3", 0L) ); - - // push a dummy record that should expire non-joined items; it should not produce any items because - // all of them are joined - inputTopic1.pipeInput(0, "dummy", 1000L); - processor.checkAndClearProcessResult(); } } @@ -580,7 +554,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)), + JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -592,17 +566,19 @@ public class KStreamKStreamLeftJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // push two items to the primary stream; the other window is empty; this should not produce any item yet + // push two items to the primary stream; the other window is empty; this should produce two spurious items // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = {} inputTopic1.pipeInput(0, "A0", 0L); inputTopic1.pipeInput(1, "A1", 100L); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 100L) + ); - // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then - // the joined records + // push one item to the other window that has a join; this should produce the joined records // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } @@ -610,7 +586,6 @@ public class KStreamKStreamLeftJoinTest { // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(1, "A1+a1", 110L) ); } @@ -631,7 +606,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -658,7 +633,10 @@ public class KStreamKStreamLeftJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); // push two items to the other stream with a window time after the previous window ended (not closed); this should not produce // joined records because the window has ended, but not closed. @@ -671,20 +649,6 @@ public class KStreamKStreamLeftJoinTest { inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time); } processor.checkAndClearProcessResult(); - - // push a dummy item to the other stream after the window is closed; this should only produced the expired non-joined records, but - // not the joined records because the window has closed - // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } - // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), - // 0:dummy (ts: 211)} - time += 1100L; - inputTopic2.pipeInput(0, "dummy", time); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "A0+null", 0L), - new KeyValueTimestamp<>(1, "A1+null", 0L) - ); } } @@ -703,7 +667,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -722,7 +686,7 @@ public class KStreamKStreamLeftJoinTest { final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); final long time = 0L; - // push two items to the primary stream; the other window is empty; this should not produce any items + // push two items to the primary stream; the other window is empty; this should produce two spurious items // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } @@ -730,7 +694,10 @@ public class KStreamKStreamLeftJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); // push four items to the other stream; this should produce two full-join items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } @@ -790,7 +757,7 @@ public class KStreamKStreamLeftJoinTest { new KeyValueTimestamp<>(3, "B3+b3", 1100L) ); - // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items (non-joined item is not produced yet) + // push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items plus one spurious item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) } // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0), @@ -805,12 +772,13 @@ public class KStreamKStreamLeftJoinTest { inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "C0+null", 1101L), new KeyValueTimestamp<>(1, "C1+b1", 1101L), new KeyValueTimestamp<>(2, "C2+b2", 1101L), new KeyValueTimestamp<>(3, "C3+b3", 1101L) ); - // push four items with increased timestamp to the primary stream; this should produce two left-join and two full-join items (non-joined items are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce two spurious and two inner items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) } @@ -827,11 +795,13 @@ public class KStreamKStreamLeftJoinTest { inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "D0+null", 1102L), + new KeyValueTimestamp<>(1, "D1+null", 1102L), new KeyValueTimestamp<>(2, "D2+b2", 1102L), new KeyValueTimestamp<>(3, "D3+b3", 1102L) ); - // push four items with increased timestamp to the primary stream; this should produce one full-join items (three non-joined left-join are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce one full-join items plus three spurious left-join items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101), @@ -850,10 +820,13 @@ public class KStreamKStreamLeftJoinTest { inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "E0+null", 1103L), + new KeyValueTimestamp<>(1, "E1+null", 1103L), + new KeyValueTimestamp<>(2, "E2+null", 1103L), new KeyValueTimestamp<>(3, "E3+b3", 1103L) ); - // push four items with increased timestamp to the primary stream; this should produce no full-join items (four non-joined left-join are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce four spurious left-join itmes // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101), @@ -873,19 +846,7 @@ public class KStreamKStreamLeftJoinTest { for (final int expectedKey : expectedKeys) { inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time); } - processor.checkAndClearProcessResult(); - - // push a dummy record to produce all left-join non-joined items - time += 301L; - driver.advanceWallClockTime(Duration.ofMillis(1000L)); - inputTopic1.pipeInput(0, "dummy", time); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "C0+null", 1101L), - new KeyValueTimestamp<>(0, "D0+null", 1102L), - new KeyValueTimestamp<>(1, "D1+null", 1102L), - new KeyValueTimestamp<>(0, "E0+null", 1103L), - new KeyValueTimestamp<>(1, "E1+null", 1103L), - new KeyValueTimestamp<>(2, "E2+null", 1103L), new KeyValueTimestamp<>(0, "F0+null", 1104L), new KeyValueTimestamp<>(1, "F1+null", 1104L), new KeyValueTimestamp<>(2, "F2+null", 1104L), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index 2d9e320..42b81f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -113,7 +113,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -125,20 +125,19 @@ public class KStreamKStreamOuterJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // verifies non-joined duplicates are emitted when window has closed + // verifies non-joined duplicates are emitted inputTopic1.pipeInput(0, "A0", 0L); inputTopic1.pipeInput(0, "A0-0", 0L); inputTopic2.pipeInput(1, "a1", 0L); inputTopic2.pipeInput(1, "a1-0", 0L); inputTopic2.pipeInput(1, "a0", 111L); - // bump stream-time to trigger outer-join results - inputTopic2.pipeInput(3, "dummy", 211); processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(0, "A0-0+null", 0L), new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(1, "null+a1-0", 0L), - new KeyValueTimestamp<>(0, "A0+null", 0L), - new KeyValueTimestamp<>(0, "A0-0+null", 0L) + new KeyValueTimestamp<>(1, "null+a0", 111L) ); // verifies joined duplicates are emitted @@ -148,21 +147,13 @@ public class KStreamKStreamOuterJoinTest { inputTopic2.pipeInput(2, "a2-0", 201L); processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "A2+null", 200L), + new KeyValueTimestamp<>(2, "A2-0+null", 200L), new KeyValueTimestamp<>(2, "A2+a2", 201L), new KeyValueTimestamp<>(2, "A2-0+a2", 201L), new KeyValueTimestamp<>(2, "A2+a2-0", 201L), new KeyValueTimestamp<>(2, "A2-0+a2-0", 201L) ); - - // this record should expired non-joined records; only null+a0 will be emitted because - // it did not have a join - driver.advanceWallClockTime(Duration.ofMillis(1000L)); - inputTopic2.pipeInput(3, "dummy", 1500L); - - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "null+a0", 111L), - new KeyValueTimestamp<>(3, "null+dummy", 211) - ); } } @@ -180,7 +171,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)), + JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -194,29 +185,20 @@ public class KStreamKStreamOuterJoinTest { final long windowStart = 0L; - // No joins detected; No null-joins emitted + // No joins detected; three spurious item emitted inputTopic1.pipeInput(0, "A0", windowStart + 1L); inputTopic1.pipeInput(1, "A1", windowStart + 2L); inputTopic1.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic2.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) - ); - - // Dummy record in left topic will emit expired non-joined records from the left topic - inputTopic1.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L), + new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L), new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L) ); - // Flush internal non-joined state store by joining the dummy record - inputTopic2.pipeInput(2, "dummy", windowStart + 401L); + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L) + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) ); } } @@ -235,7 +217,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -253,25 +235,16 @@ public class KStreamKStreamOuterJoinTest { inputTopic1.pipeInput(0, "A0", windowStart + 1L); inputTopic1.pipeInput(1, "A1", windowStart + 2L); inputTopic1.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic2.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) - ); - - // Dummy record in right topic will emit expired non-joined records from the left topic - inputTopic2.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L), + new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L), new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L) ); - // Flush internal non-joined state store by joining the dummy record - inputTopic1.pipeInput(2, "dummy", windowStart + 402L); + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L) + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L) ); } } @@ -290,7 +263,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)), + JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -304,29 +277,20 @@ public class KStreamKStreamOuterJoinTest { final long windowStart = 0L; - // No joins detected; No null-joins emitted + // No joins detectedl; only spurious resulst inputTopic2.pipeInput(0, "A0", windowStart + 1L); inputTopic2.pipeInput(1, "A1", windowStart + 2L); inputTopic2.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic1.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L) - ); - - // Dummy record in left topic will emit expired non-joined records from the right topic - inputTopic1.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L), + new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L), new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L) ); - // Process the dummy joined record - inputTopic2.pipeInput(2, "dummy", windowStart + 402L); + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L) + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L) ); } } @@ -345,7 +309,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)), + JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -363,25 +327,16 @@ public class KStreamKStreamOuterJoinTest { inputTopic2.pipeInput(0, "A0", windowStart + 1L); inputTopic2.pipeInput(1, "A1", windowStart + 2L); inputTopic2.pipeInput(0, "A0-0", windowStart + 3L); - processor.checkAndClearProcessResult(); - - // Join detected; No null-joins emitted - inputTopic1.pipeInput(1, "a1", windowStart + 3L); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L) - ); - - // Dummy record in right topic will emit expired non-joined records from the right topic - inputTopic2.pipeInput(2, "dummy", windowStart + 401L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L), + new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L), new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L) ); - // Process the dummy joined record - inputTopic1.pipeInput(2, "dummy", windowStart + 402L); + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L) + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L) ); } } @@ -400,7 +355,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -412,17 +367,19 @@ public class KStreamKStreamOuterJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // push two items to the primary stream; the other window is empty; this should not produce any item yet + // push two items to the primary stream; the other window is empty; this should produce two spurious resulst // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = {} inputTopic1.pipeInput(0, "A0", 0L); inputTopic1.pipeInput(1, "A1", 100L); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 100L) + ); - // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then - // the joined records + // push one item to the other window that has a join; this should produce the joined records // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } @@ -430,7 +387,6 @@ public class KStreamKStreamOuterJoinTest { // --> w2 = { 0:a0 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(1, "A1+a1", 110L) ); } @@ -450,7 +406,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofMillis(10)), + JoinWindows.of(ofMillis(100)).grace(ofMillis(10)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -468,43 +424,36 @@ public class KStreamKStreamOuterJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // push one item to the primary stream; and one item in other stream; this should not produce items because there are no joins - // and window has not ended + // push one item to the primary stream; and one item in other stream; this should produce two spurious items // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0) } // --> w2 = { 1:a1 (ts: 0) } inputTopic1.pipeInput(0, "A0", 0L); inputTopic2.pipeInput(1, "a1", 0L); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "null+a1", 0L) + ); - // push one item on each stream with a window time after the previous window ended (not closed); this should not produce - // joined records because the window has ended, but will not produce non-joined records because the window has not closed. + // push one item on each stream with a window time after the previous window ended; + // this should produce two spurious items // w1 = { 0:A0 (ts: 0) } // w2 = { 1:a1 (ts: 0) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } inputTopic2.pipeInput(0, "a0", 101L); inputTopic1.pipeInput(1, "A1", 101L); - processor.checkAndClearProcessResult(); - - // push a dummy item to the any stream after the window is closed; this should produced all expired non-joined records because - // the window has closed - // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } - // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } - inputTopic2.pipeInput(0, "dummy", 211); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(1, "null+a1", 0L), - new KeyValueTimestamp<>(0, "A0+null", 0L) + new KeyValueTimestamp<>(0, "null+a0", 101L), + new KeyValueTimestamp<>(1, "A1+null", 101L) ); } } @Test public void testOuterJoinWithInMemoryCustomSuppliers() { - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO); final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore( "in-memory-join-store", @@ -527,7 +476,7 @@ public class KStreamKStreamOuterJoinTest { @Test public void testOuterJoinWithDefaultSuppliers() { - final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)); + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO); final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()); runOuterJoin(streamJoined, joinWindows); @@ -567,11 +516,11 @@ public class KStreamKStreamOuterJoinTest { driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - // 2 window stores + 1 shared window store should be available - assertEquals(3, driver.getAllStateStores().size()); + // 2 window stores should be available + assertEquals(2, driver.getAllStateStores().size()); - // push two items to the primary stream; the other window is empty; this should not - // produce any items because window has not expired + // push two items to the primary stream; the other window is empty; this should + // produce two spurious items // w1 {} // w2 {} // --> w1 = { 0:A0, 1:A1 } @@ -579,7 +528,10 @@ public class KStreamKStreamOuterJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); // push two items to the other stream; this should produce two full-joined items // w1 = { 0:A0, 1:A1 } @@ -594,7 +546,7 @@ public class KStreamKStreamOuterJoinTest { new KeyValueTimestamp<>(1, "A1+a1", 0L) ); - // push three items to the primary stream; this should produce two full-joined items + // push three items to the primary stream; this should produce threeitems // w1 = { 0:A0, 1:A1 } // w2 = { 0:a0, 1:a1 } // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 } @@ -604,10 +556,11 @@ public class KStreamKStreamOuterJoinTest { } processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "B0+a0", 0L), - new KeyValueTimestamp<>(1, "B1+a1", 0L) + new KeyValueTimestamp<>(1, "B1+a1", 0L), + new KeyValueTimestamp<>(2, "B2+null", 0L) ); - // push all items to the other stream; this should produce five full-joined items + // push all items to the other stream; this should produce six items // w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 } // w2 = { 0:a0, 1:a1 } // --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 } @@ -620,7 +573,8 @@ public class KStreamKStreamOuterJoinTest { new KeyValueTimestamp<>(0, "B0+b0", 0L), new KeyValueTimestamp<>(1, "A1+b1", 0L), new KeyValueTimestamp<>(1, "B1+b1", 0L), - new KeyValueTimestamp<>(2, "B2+b2", 0L) + new KeyValueTimestamp<>(2, "B2+b2", 0L), + new KeyValueTimestamp<>(3, "null+b3", 0L) ); // push all four items to the primary stream; this should produce six full-joined items @@ -639,11 +593,6 @@ public class KStreamKStreamOuterJoinTest { new KeyValueTimestamp<>(2, "C2+b2", 0L), new KeyValueTimestamp<>(3, "C3+b3", 0L) ); - - // push a dummy record that should expire non-joined items; it should not produce any items because - // all of them are joined - inputTopic1.pipeInput(0, "dummy", 400L); - processor.checkAndClearProcessResult(); } } @@ -662,7 +611,7 @@ public class KStreamKStreamOuterJoinTest { joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, - JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), + JoinWindows.of(ofMillis(100)).grace(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) ); joined.process(supplier); @@ -681,7 +630,7 @@ public class KStreamKStreamOuterJoinTest { final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); final long time = 0L; - // push two items to the primary stream; the other window is empty; this should not produce items because window has not closed + // push two items to the primary stream; the other window is empty; this should produce two spurious items // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } @@ -689,9 +638,12 @@ public class KStreamKStreamOuterJoinTest { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(1, "A1+null", 0L) + ); - // push four items to the other stream; this should produce two full-join items + // push four items to the other stream; this should produce four full-join items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } @@ -701,7 +653,9 @@ public class KStreamKStreamOuterJoinTest { } processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+a0", 0L), - new KeyValueTimestamp<>(1, "A1+a1", 0L) + new KeyValueTimestamp<>(1, "A1+a1", 0L), + new KeyValueTimestamp<>(2, "null+a2", 0L), + new KeyValueTimestamp<>(3, "null+a3", 0L) ); testUpperWindowBound(expectedKeys, driver, processor); @@ -730,8 +684,10 @@ public class KStreamKStreamOuterJoinTest { inputTopic2.pipeInput(expectedKeys[i], "b" + expectedKeys[i], time + i); } processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(2, "null+a2", 0L), - new KeyValueTimestamp<>(3, "null+a3", 0L) + new KeyValueTimestamp<>(0, "null+b0", 1000L), + new KeyValueTimestamp<>(1, "null+b1", 1001L), + new KeyValueTimestamp<>(2, "null+b2", 1002L), + new KeyValueTimestamp<>(3, "null+b3", 1003L) ); // push four items with larger timestamp to the primary stream; this should produce four full-join items @@ -753,7 +709,7 @@ public class KStreamKStreamOuterJoinTest { new KeyValueTimestamp<>(3, "B3+b3", 1100L) ); - // push four items with increased timestamp to the primary stream; this should produce three full-join items (non-joined item is not produced yet) + // push four items with increased timestamp to the primary stream; this should produce three full-join items plus one spurious item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) } // w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0), @@ -768,12 +724,13 @@ public class KStreamKStreamOuterJoinTest { inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "C0+null", 1101L), new KeyValueTimestamp<>(1, "C1+b1", 1101L), new KeyValueTimestamp<>(2, "C2+b2", 1101L), new KeyValueTimestamp<>(3, "C3+b3", 1101L) ); - // push four items with increased timestamp to the primary stream; this should produce two full-join items (non-joined items are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce two full-join items plus two spurious items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) } @@ -790,11 +747,13 @@ public class KStreamKStreamOuterJoinTest { inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "D0+null", 1102L), + new KeyValueTimestamp<>(1, "D1+null", 1102L), new KeyValueTimestamp<>(2, "D2+b2", 1102L), new KeyValueTimestamp<>(3, "D3+b3", 1102L) ); - // push four items with increased timestamp to the primary stream; this should produce one full-join items (three non-joined left-join are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce one full-join items plus three spurious itmes // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101), @@ -813,10 +772,13 @@ public class KStreamKStreamOuterJoinTest { inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time); } processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "E0+null", 1103L), + new KeyValueTimestamp<>(1, "E1+null", 1103L), + new KeyValueTimestamp<>(2, "E2+null", 1103L), new KeyValueTimestamp<>(3, "E3+b3", 1103L) ); - // push four items with increased timestamp to the primary stream; this should produce no full-join items (four non-joined left-join are not produced yet) + // push four items with increased timestamp to the primary stream; this should produce four spurious items // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0), // 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100), // 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101), @@ -836,23 +798,12 @@ public class KStreamKStreamOuterJoinTest { for (final int expectedKey : expectedKeys) { inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time); } - processor.checkAndClearProcessResult(); - - // push a dummy record to produce all left-join non-joined items - time += 301L; - driver.advanceWallClockTime(Duration.ofMillis(1000L)); - inputTopic1.pipeInput(0, "dummy", time); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "C0+null", 1101L), - new KeyValueTimestamp<>(0, "D0+null", 1102L), - new KeyValueTimestamp<>(1, "D1+null", 1102L), - new KeyValueTimestamp<>(0, "E0+null", 1103L), - new KeyValueTimestamp<>(1, "E1+null", 1103L), - new KeyValueTimestamp<>(2, "E2+null", 1103L), new KeyValueTimestamp<>(0, "F0+null", 1104L), new KeyValueTimestamp<>(1, "F1+null", 1104L), new KeyValueTimestamp<>(2, "F2+null", 1104L), new KeyValueTimestamp<>(3, "F3+null", 1104L) + ); } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 92ca5bd..ad02d04 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -379,7 +379,7 @@ class TopologyTest { mappedStream .filter((k: String, _: String) => k == "A") .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, - JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))( + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde) ) .to(JOINED_TOPIC) @@ -387,7 +387,7 @@ class TopologyTest { mappedStream .filter((k: String, _: String) => k == "A") .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, - JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))( + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde) ) .to(JOINED_TOPIC) @@ -439,7 +439,7 @@ class TopologyTest { .join[Integer, String]( stream2, valueJoiner2, - JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)), + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)), StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer) ) .to(JOINED_TOPIC) @@ -449,7 +449,7 @@ class TopologyTest { .join( stream3, valueJoiner3, - JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)), + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)), StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String) ) .to(JOINED_TOPIC) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 0ec7b0e..1ba1238 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -201,7 +201,7 @@ class KStreamTest extends TestDriver { val stream1 = builder.stream[String, String](sourceTopic1) val stream2 = builder.stream[String, String](sourceTopic2) stream1 - .join(stream2)((a, b) => s"$a-$b", JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24))) + .join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1)).grace(Duration.ofHours(24))) .to(sinkTopic) val now = Instant.now()