This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 8a81b17 HOTIFX: Disable spurious left/outer stream-stream join fix
(#11233)
8a81b17 is described below
commit 8a81b175d9dd694e82804fc77dacff86531af1e7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Aug 23 09:45:49 2021 -0700
HOTIFX: Disable spurious left/outer stream-stream join fix (#11233)
KAFKA-10847 improves stream-stream left/outer joins to avoid spurious
left/outer join results. However, it introduces regression bug
KAFKA-13216.
This PR disables KAFKA-10847 by partially rolling back KIP-633 changes.
---
.../apache/kafka/streams/kstream/JoinWindows.java | 46 -----
.../apache/kafka/streams/StreamsBuilderTest.java | 20 +-
.../org/apache/kafka/streams/TopologyTest.java | 36 ++--
.../StreamStreamJoinIntegrationTest.java | 36 ++--
.../kafka/streams/kstream/JoinWindowsTest.java | 22 --
.../KStreamImplValueJoinerWithKeyTest.java | 9 +-
.../kstream/internals/KStreamKStreamJoinTest.java | 15 +-
.../internals/KStreamKStreamLeftJoinTest.java | 151 ++++++--------
.../internals/KStreamKStreamOuterJoinTest.java | 223 ++++++++-------------
.../apache/kafka/streams/scala/TopologyTest.scala | 8 +-
.../kafka/streams/scala/kstream/KStreamTest.scala | 2 +-
11 files changed, 214 insertions(+), 354 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 1b26bcc..f353739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -104,55 +104,11 @@ public class JoinWindows extends Windows<Window> {
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
* the timestamp of the record from the primary stream.
- * <p>
- * Using this method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd}, which
- * means that only out-of-order records arriving more than the grace
period after the window end will be dropped.
- * The window close, after which any incoming records are considered late
and will be rejected, is defined as
- * {@code windowEnd + afterWindowEnd}
- *
- * @param timeDifference join window interval
- * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
- * @return A new JoinWindows object with the specified window definition
and grace period
- * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
- * if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
- */
- public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
- final String timeDifferenceMsgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
-
- final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
-
- return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
afterWindowEndMs, true);
- }
-
- /**
- * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
- * the timestamp of the record from the primary stream.
- * <p>
- * CAUTION: Using this method implicitly sets the grace period to zero,
which means that any out-of-order
- * records arriving after the window ends are considered late and will be
dropped.
- *
- * @param timeDifference join window interval
- * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out-of-order records arriving after the
window end will be dropped
- * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
- */
- public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) {
- return ofTimeDifferenceAndGrace(timeDifference,
Duration.ofMillis(NO_GRACE_PERIOD));
- }
-
- /**
- * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
- * the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @return a new JoinWindows object with the window definition with and
grace period (default to 24 hours minus {@code timeDifference})
* @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0. Use {@link
#ofTimeDifferenceWithNoGrace(Duration)}} instead
*/
- @Deprecated
public static JoinWindows of(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefix);
@@ -216,9 +172,7 @@ public class JoinWindows extends Windows<Window> {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration,
Duration)} instead
*/
- @Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via
ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e18b972..2f0dedd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -745,7 +745,7 @@ public class StreamsBuilderTest {
streamOne.leftJoin(
streamTwo,
(value1, value2) -> value1,
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+ JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
);
@@ -754,8 +754,7 @@ public class StreamsBuilderTest {
final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
STREAM_OPERATION_NAME + "-this-join-store",
- STREAM_OPERATION_NAME + "-outer-other-join-store",
- STREAM_OPERATION_NAME + "-left-shared-join-store"
+ STREAM_OPERATION_NAME + "-outer-other-join-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
@@ -775,7 +774,7 @@ public class StreamsBuilderTest {
streamOne.leftJoin(
streamTwo,
(value1, value2) -> value1,
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+ JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
.withName(STREAM_OPERATION_NAME)
);
@@ -784,8 +783,7 @@ public class StreamsBuilderTest {
final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
"KSTREAM-JOINTHIS-0000000004-store",
- "KSTREAM-OUTEROTHER-0000000005-store",
- "KSTREAM-OUTERSHARED-0000000004-store"
+ "KSTREAM-OUTEROTHER-0000000005-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
@@ -851,7 +849,7 @@ public class StreamsBuilderTest {
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+ JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
);
@@ -859,8 +857,7 @@ public class StreamsBuilderTest {
final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
STREAM_OPERATION_NAME +
"-outer-this-join-store",
- STREAM_OPERATION_NAME +
"-outer-other-join-store",
- STREAM_OPERATION_NAME +
"-outer-shared-join-store");
+ STREAM_OPERATION_NAME +
"-outer-other-join-store");
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
@@ -880,7 +877,7 @@ public class StreamsBuilderTest {
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+ JoinWindows.of(Duration.ofHours(1)).grace(Duration.ZERO),
StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
.withName(STREAM_OPERATION_NAME)
);
@@ -889,8 +886,7 @@ public class StreamsBuilderTest {
final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
"KSTREAM-OUTERTHIS-0000000004-store",
- "KSTREAM-OUTEROTHER-0000000005-store",
- "KSTREAM-OUTERSHARED-0000000004-store"
+ "KSTREAM-OUTEROTHER-0000000005-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index b332f6c..45f7c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -853,7 +853,7 @@ public class TopologyTest {
stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String()));
final TopologyDescription describe = builder.build().describe();
@@ -871,10 +871,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[KSTREAM-OUTEROTHER-0000000005-store])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n"
+
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[KSTREAM-OUTEROTHER-0000000005-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[KSTREAM-JOINTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[KSTREAM-JOINTHIS-0000000004-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -895,7 +895,7 @@ public class TopologyTest {
stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
.withStoreName("custom-name"));
@@ -914,10 +914,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[custom-name-outer-other-join-store])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[custom-name-outer-other-join-store, custom-name-left-shared-join-store])\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[custom-name-outer-other-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[custom-name-this-join-store, custom-name-left-shared-join-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[custom-name-this-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -935,7 +935,7 @@ public class TopologyTest {
stream1 = builder.stream("input-topic1");
stream2 = builder.stream("input-topic2");
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100)).grace(Duration.ZERO);
final WindowBytesStoreSupplier thisStoreSupplier =
Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() +
joinWindows.gracePeriodMs()),
@@ -968,10 +968,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[in-memory-join-store-other])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[in-memory-join-store-other])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[in-memory-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -992,7 +992,7 @@ public class TopologyTest {
stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String()));
final TopologyDescription describe = builder.build().describe();
@@ -1010,10 +1010,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[KSTREAM-OUTEROTHER-0000000005-store])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[KSTREAM-OUTERTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[KSTREAM-OUTERTHIS-0000000004-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
- " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n"
+
+ " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[KSTREAM-OUTEROTHER-0000000005-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -1034,7 +1034,7 @@ public class TopologyTest {
stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
.withStoreName("custom-name"));
@@ -1053,10 +1053,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[custom-name-outer-other-join-store])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[custom-name-outer-this-join-store, custom-name-outer-shared-join-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[custom-name-outer-this-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
- " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[custom-name-outer-other-join-store, custom-name-outer-shared-join-store])\n" +
+ " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[custom-name-outer-other-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
@@ -1074,7 +1074,7 @@ public class TopologyTest {
stream1 = builder.stream("input-topic1");
stream2 = builder.stream("input-topic2");
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100)).grace(Duration.ZERO);
final WindowBytesStoreSupplier thisStoreSupplier =
Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() +
joinWindows.gracePeriodMs()),
@@ -1107,10 +1107,10 @@ public class TopologyTest {
" Processor: KSTREAM-WINDOWED-0000000003 (stores:
[in-memory-join-store-other])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
- " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores:
[in-memory-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
- " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" +
+ " Processor: KSTREAM-OUTERTHIS-0000000004 (stores:
[in-memory-join-store-other])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 9d2bd1e..ef00743 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -100,7 +100,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
leftStream.join(
rightStream,
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -147,7 +147,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10),
ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -161,7 +161,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a",
null, 4L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+ ),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -192,7 +195,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
leftStream.leftJoin(
rightStream,
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -206,7 +209,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a",
null, 4L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+ ),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -239,7 +245,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10),
ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -253,7 +259,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a",
null, 4L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+ ),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -284,7 +293,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
leftStream.outerJoin(
rightStream,
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -298,7 +307,10 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
null,
null,
null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a",
null, 4L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)
+ ),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
@@ -331,7 +343,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10),
ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
@@ -424,11 +436,11 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
leftStream.join(
rightStream,
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).join(
rightStream,
valueJoiner,
- JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ JoinWindows.of(ofSeconds(10)).grace(ofHours(24))
).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 3d38ada..7fb692e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -75,8 +75,6 @@ public class JoinWindowsTest {
@Test
public void timeDifferenceMustNotBeNegative() {
assertThrows(IllegalArgumentException.class, () ->
JoinWindows.of(ofMillis(-1)));
- assertThrows(IllegalArgumentException.class, () ->
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1)));
- assertThrows(IllegalArgumentException.class, () ->
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE)));
}
@Test
@@ -149,16 +147,6 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
);
-
- verifyEquality(
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)),
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
- );
-
- verifyEquality(
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)),
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))
- );
}
@Test
@@ -188,15 +176,5 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
-
- verifyInEquality(
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)),
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
- );
-
- verifyInEquality(
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)),
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9))
- );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
index b35a7b2..dcd519d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import java.util.Arrays;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -64,7 +65,7 @@ public class KStreamImplValueJoinerWithKeyTest {
private final ValueJoinerWithKey<String, Integer, Integer, String>
valueJoinerWithKey =
(key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv));
- private final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofHours(24L));
+ private final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100)).grace(ofHours(24L));
private final StreamJoined<String, Integer, Integer> streamJoined =
StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer());
private final Joined<String, Integer, Integer> joined =
@@ -108,7 +109,8 @@ public class KStreamImplValueJoinerWithKeyTest {
).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
// Left KV A, 3, Right KV A, 5
// TTD pipes records to left stream first, then right
- final List<KeyValue<String, String>> expectedResults =
Collections.singletonList(KeyValue.pair("A", "A:5"));
+ final List<KeyValue<String, String>> expectedResults =
+ Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A",
"A:5"));
runJoinTopology(
builder,
expectedResults,
@@ -128,7 +130,8 @@ public class KStreamImplValueJoinerWithKeyTest {
// Left KV A, 3, Right KV A, 5
// TTD pipes records to left stream first, then right
- final List<KeyValue<String, String>> expectedResults =
Collections.singletonList(KeyValue.pair("A", "A:5"));
+ final List<KeyValue<String, String>> expectedResults =
+ Arrays.asList(KeyValue.pair("A", "A:3"), KeyValue.pair("A",
"A:5"));
runJoinTopology(
builder,
expectedResults,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 1d50a37..517e0eb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -515,7 +515,7 @@ public class KStreamKStreamJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofHours(24L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -532,7 +532,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // push two items to the primary stream; the other window is
empty; this should not produce items yet
+ // push two items to the primary stream; the other window is
empty; this should produce 2 spurious items
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0, 1:A1 }
@@ -540,7 +540,10 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
// push two items to the other stream; this should produce two
items
// w1 = { 0:A0, 1:A1 }
@@ -555,7 +558,7 @@ public class KStreamKStreamJoinTest {
new KeyValueTimestamp<>(1, "A1+a1", 0L)
);
- // push all four items to the primary stream; this should produce
two items
+ // push all four items to the primary stream; this should produce
four items
// w1 = { 0:A0, 1:A1 }
// w2 = { 0:a0, 1:a1 }
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 3:B3 }
@@ -565,7 +568,9 @@ public class KStreamKStreamJoinTest {
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "B0+a0", 0L),
- new KeyValueTimestamp<>(1, "B1+a1", 0L)
+ new KeyValueTimestamp<>(1, "B1+a1", 0L),
+ new KeyValueTimestamp<>(2, "B2+null", 0L),
+ new KeyValueTimestamp<>(3, "B3+null", 0L)
);
// push all items to the other stream; this should produce six
items
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..5c6b3f0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -172,7 +172,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L),
ofMillis(10L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -202,6 +202,8 @@ public class KStreamKStreamLeftJoinTest {
inputTopic2.pipeInput(2, "a2", 1001L);
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "A2+null", 1000L),
+ new KeyValueTimestamp<>(2, "A2-0+null", 1000L),
new KeyValueTimestamp<>(2, "A2+a2", 1001L),
new KeyValueTimestamp<>(2, "A2-0+a2", 1001L)
);
@@ -228,7 +230,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -246,25 +248,16 @@ public class KStreamKStreamLeftJoinTest {
inputTopic1.pipeInput(0, "A0", windowStart + 1L);
inputTopic1.pipeInput(1, "A1", windowStart + 2L);
inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic2.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
- );
-
- // Dummy record in left topic will emit expired non-joined records
from the left topic
- inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
);
- // Flush internal non-joined state store by joining the dummy
record
- inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
+ // Join detected; No null-joins emitted
+ inputTopic2.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L)
+ new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
);
}
}
@@ -283,7 +276,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -297,29 +290,20 @@ public class KStreamKStreamLeftJoinTest {
final long windowStart = 0L;
- // No joins detected; No null-joins emitted
+ // No joins detected; spurious null-joins emitted
inputTopic1.pipeInput(0, "A0", windowStart + 1L);
inputTopic1.pipeInput(1, "A1", windowStart + 2L);
inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic2.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
- );
-
- // Dummy record in right topic will emit expired non-joined
records from the left topic
- inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
);
- // Flush internal non-joined state store by joining the dummy
record
- inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+ // Join detected; No null-joins emitted
+ inputTopic2.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+ new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
);
}
}
@@ -338,7 +322,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -363,16 +347,6 @@ public class KStreamKStreamLeftJoinTest {
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
);
-
- // Dummy record in left topic will not emit records
- inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
- processor.checkAndClearProcessResult();
-
- // Process the dummy joined record
- inputTopic2.pipeInput(2, "dummy", windowStart + 402L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
- );
}
}
@@ -390,7 +364,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -430,7 +404,7 @@ public class KStreamKStreamLeftJoinTest {
@Test
public void testLeftJoinWithInMemoryCustomSuppliers() {
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L));
final WindowBytesStoreSupplier thisStoreSupplier =
Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() +
joinWindows.gracePeriodMs()),
@@ -447,7 +421,7 @@ public class KStreamKStreamLeftJoinTest {
@Test
public void testLeftJoinWithDefaultSuppliers() {
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
final StreamJoined<Integer, String, String> streamJoined =
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runLeftJoin(streamJoined, joinWindows);
@@ -487,8 +461,8 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // 2 window stores + 1 shared window store should be available
- assertEquals(3, driver.getAllStateStores().size());
+ // 2 window stores should be available
+ assertEquals(2, driver.getAllStateStores().size());
// push two items to the primary stream; the other window is empty
// w1 {}
@@ -498,7 +472,10 @@ public class KStreamKStreamLeftJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
// push two items to the other stream; this should produce two
items
// w1 = { 0:A0, 1:A1 }
@@ -513,7 +490,7 @@ public class KStreamKStreamLeftJoinTest {
new KeyValueTimestamp<>(1, "A1+a1", 0L)
);
- // push three items to the primary stream; this should produce two
joined items
+ // push three items to the primary stream; this should produce
three joined items
// w1 = { 0:A0, 1:A1 }
// w2 = { 0:a0, 1:a1 }
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -523,7 +500,9 @@ public class KStreamKStreamLeftJoinTest {
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "B0+a0", 0L),
- new KeyValueTimestamp<>(1, "B1+a1", 0L)
+ new KeyValueTimestamp<>(1, "B1+a1", 0L),
+ new KeyValueTimestamp<>(2, "B2+null", 0L)
+
);
// push all items to the other stream; this should produce five
items
@@ -558,11 +537,6 @@ public class KStreamKStreamLeftJoinTest {
new KeyValueTimestamp<>(2, "C2+b2", 0L),
new KeyValueTimestamp<>(3, "C3+b3", 0L)
);
-
- // push a dummy record that should expire non-joined items; it
should not produce any items because
- // all of them are joined
- inputTopic1.pipeInput(0, "dummy", 1000L);
- processor.checkAndClearProcessResult();
}
}
@@ -580,7 +554,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -592,17 +566,19 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // push two items to the primary stream; the other window is
empty; this should not produce any item yet
+ // push two items to the primary stream; the other window is
empty; this should produce two spurious items
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = {}
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(1, "A1", 100L);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 100L)
+ );
- // push one item to the other window that has a join; this should
produce non-joined records with a closed window first, then
- // the joined records
+ // push one item to the other window that has a join; this should
produce the joined records
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
@@ -610,7 +586,6 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 1:a1 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "A0+null", 0L),
new KeyValueTimestamp<>(1, "A1+a1", 110L)
);
}
@@ -631,7 +606,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L),
ofMillis(10L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -658,7 +633,10 @@ public class KStreamKStreamLeftJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i],
time);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
// push two items to the other stream with a window time after the
previous window ended (not closed); this should not produce
// joined records because the window has ended, but not closed.
@@ -671,20 +649,6 @@ public class KStreamKStreamLeftJoinTest {
inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
}
processor.checkAndClearProcessResult();
-
- // push a dummy item to the other stream after the window is
closed; this should only produced the expired non-joined records, but
- // not the joined records because the window has closed
- // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
- // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
- // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
- // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101),
- // 0:dummy (ts: 211)}
- time += 1100L;
- inputTopic2.pipeInput(0, "dummy", time);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "A0+null", 0L),
- new KeyValueTimestamp<>(1, "A1+null", 0L)
- );
}
}
@@ -703,7 +667,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -722,7 +686,7 @@ public class KStreamKStreamLeftJoinTest {
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
final long time = 0L;
- // push two items to the primary stream; the other window is
empty; this should not produce any items
+ // push two items to the primary stream; the other window is
empty; this should produce two spurious items
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -730,7 +694,10 @@ public class KStreamKStreamLeftJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i],
time);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
// push four items to the other stream; this should produce two
full-join items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -790,7 +757,7 @@ public class KStreamKStreamLeftJoinTest {
new KeyValueTimestamp<>(3, "B3+b3", 1100L)
);
- // push four items with increased timestamp to the primary stream;
this should produce one left-join and three full-join items (non-joined item is
not produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce one left-join and three full-join items plus one spurious
item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100) }
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
@@ -805,12 +772,13 @@ public class KStreamKStreamLeftJoinTest {
inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "C0+null", 1101L),
new KeyValueTimestamp<>(1, "C1+b1", 1101L),
new KeyValueTimestamp<>(2, "C2+b2", 1101L),
new KeyValueTimestamp<>(3, "C3+b3", 1101L)
);
- // push four items with increased timestamp to the primary stream;
this should produce two left-join and two full-join items (non-joined items are
not produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce two spurious and two inner items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101) }
@@ -827,11 +795,13 @@ public class KStreamKStreamLeftJoinTest {
inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "D0+null", 1102L),
+ new KeyValueTimestamp<>(1, "D1+null", 1102L),
new KeyValueTimestamp<>(2, "D2+b2", 1102L),
new KeyValueTimestamp<>(3, "D3+b3", 1102L)
);
- // push four items with increased timestamp to the primary stream;
this should produce one full-join items (three non-joined left-join are not
produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce one full-join items plus three spurious left-join items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101),
@@ -850,10 +820,13 @@ public class KStreamKStreamLeftJoinTest {
inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "E0+null", 1103L),
+ new KeyValueTimestamp<>(1, "E1+null", 1103L),
+ new KeyValueTimestamp<>(2, "E2+null", 1103L),
new KeyValueTimestamp<>(3, "E3+b3", 1103L)
);
- // push four items with increased timestamp to the primary stream;
this should produce no full-join items (four non-joined left-join are not
produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce four spurious left-join itmes
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101),
@@ -873,19 +846,7 @@ public class KStreamKStreamLeftJoinTest {
for (final int expectedKey : expectedKeys) {
inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time);
}
- processor.checkAndClearProcessResult();
-
- // push a dummy record to produce all left-join non-joined items
- time += 301L;
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
- inputTopic1.pipeInput(0, "dummy", time);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "C0+null", 1101L),
- new KeyValueTimestamp<>(0, "D0+null", 1102L),
- new KeyValueTimestamp<>(1, "D1+null", 1102L),
- new KeyValueTimestamp<>(0, "E0+null", 1103L),
- new KeyValueTimestamp<>(1, "E1+null", 1103L),
- new KeyValueTimestamp<>(2, "E2+null", 1103L),
new KeyValueTimestamp<>(0, "F0+null", 1104L),
new KeyValueTimestamp<>(1, "F1+null", 1104L),
new KeyValueTimestamp<>(2, "F2+null", 1104L),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..42b81f7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -113,7 +113,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L),
ofMillis(10L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -125,20 +125,19 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // verifies non-joined duplicates are emitted when window has
closed
+ // verifies non-joined duplicates are emitted
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(0, "A0-0", 0L);
inputTopic2.pipeInput(1, "a1", 0L);
inputTopic2.pipeInput(1, "a1-0", 0L);
inputTopic2.pipeInput(1, "a0", 111L);
- // bump stream-time to trigger outer-join results
- inputTopic2.pipeInput(3, "dummy", 211);
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(0, "A0-0+null", 0L),
new KeyValueTimestamp<>(1, "null+a1", 0L),
new KeyValueTimestamp<>(1, "null+a1-0", 0L),
- new KeyValueTimestamp<>(0, "A0+null", 0L),
- new KeyValueTimestamp<>(0, "A0-0+null", 0L)
+ new KeyValueTimestamp<>(1, "null+a0", 111L)
);
// verifies joined duplicates are emitted
@@ -148,21 +147,13 @@ public class KStreamKStreamOuterJoinTest {
inputTopic2.pipeInput(2, "a2-0", 201L);
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "A2+null", 200L),
+ new KeyValueTimestamp<>(2, "A2-0+null", 200L),
new KeyValueTimestamp<>(2, "A2+a2", 201L),
new KeyValueTimestamp<>(2, "A2-0+a2", 201L),
new KeyValueTimestamp<>(2, "A2+a2-0", 201L),
new KeyValueTimestamp<>(2, "A2-0+a2-0", 201L)
);
-
- // this record should expired non-joined records; only null+a0
will be emitted because
- // it did not have a join
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
- inputTopic2.pipeInput(3, "dummy", 1500L);
-
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "null+a0", 111L),
- new KeyValueTimestamp<>(3, "null+dummy", 211)
- );
}
}
@@ -180,7 +171,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -194,29 +185,20 @@ public class KStreamKStreamOuterJoinTest {
final long windowStart = 0L;
- // No joins detected; No null-joins emitted
+ // No joins detected; three spurious item emitted
inputTopic1.pipeInput(0, "A0", windowStart + 1L);
inputTopic1.pipeInput(1, "A1", windowStart + 2L);
inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic2.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
- );
-
- // Dummy record in left topic will emit expired non-joined records
from the left topic
- inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
);
- // Flush internal non-joined state store by joining the dummy
record
- inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
+ // Join detected; No null-joins emitted
+ inputTopic2.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401L)
+ new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
);
}
}
@@ -235,7 +217,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -253,25 +235,16 @@ public class KStreamKStreamOuterJoinTest {
inputTopic1.pipeInput(0, "A0", windowStart + 1L);
inputTopic1.pipeInput(1, "A1", windowStart + 2L);
inputTopic1.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic2.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
- );
-
- // Dummy record in right topic will emit expired non-joined
records from the left topic
- inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "A1+null", windowStart + 2L),
new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3L)
);
- // Flush internal non-joined state store by joining the dummy
record
- inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+ // Join detected; No null-joins emitted
+ inputTopic2.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+ new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3L)
);
}
}
@@ -290,7 +263,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -304,29 +277,20 @@ public class KStreamKStreamOuterJoinTest {
final long windowStart = 0L;
- // No joins detected; No null-joins emitted
+ // No joins detectedl; only spurious resulst
inputTopic2.pipeInput(0, "A0", windowStart + 1L);
inputTopic2.pipeInput(1, "A1", windowStart + 2L);
inputTopic2.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic1.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
- );
-
- // Dummy record in left topic will emit expired non-joined records
from the right topic
- inputTopic1.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L),
new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L)
);
- // Process the dummy joined record
- inputTopic2.pipeInput(2, "dummy", windowStart + 402L);
+ // Join detected; No null-joins emitted
+ inputTopic1.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+ new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
);
}
}
@@ -345,7 +309,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)),
+ JoinWindows.of(ofMillis(100L)).grace(ofMillis(0L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -363,25 +327,16 @@ public class KStreamKStreamOuterJoinTest {
inputTopic2.pipeInput(0, "A0", windowStart + 1L);
inputTopic2.pipeInput(1, "A1", windowStart + 2L);
inputTopic2.pipeInput(0, "A0-0", windowStart + 3L);
- processor.checkAndClearProcessResult();
-
- // Join detected; No null-joins emitted
- inputTopic1.pipeInput(1, "a1", windowStart + 3L);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
- );
-
- // Dummy record in right topic will emit expired non-joined
records from the right topic
- inputTopic2.pipeInput(2, "dummy", windowStart + 401L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "null+A0", windowStart + 1L),
+ new KeyValueTimestamp<>(1, "null+A1", windowStart + 2L),
new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3L)
);
- // Process the dummy joined record
- inputTopic1.pipeInput(2, "dummy", windowStart + 402L);
+ // Join detected; No null-joins emitted
+ inputTopic1.pipeInput(1, "a1", windowStart + 3L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402L)
+ new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3L)
);
}
}
@@ -400,7 +355,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -412,17 +367,19 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // push two items to the primary stream; the other window is
empty; this should not produce any item yet
+ // push two items to the primary stream; the other window is
empty; this should produce two spurious resulst
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = {}
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(1, "A1", 100L);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 100L)
+ );
- // push one item to the other window that has a join; this should
produce non-joined records with a closed window first, then
- // the joined records
+ // push one item to the other window that has a join; this should
produce the joined records
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
@@ -430,7 +387,6 @@ public class KStreamKStreamOuterJoinTest {
// --> w2 = { 0:a0 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "A0+null", 0L),
new KeyValueTimestamp<>(1, "A1+a1", 110L)
);
}
@@ -450,7 +406,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), ofMillis(10)),
+ JoinWindows.of(ofMillis(100)).grace(ofMillis(10)),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -468,43 +424,36 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // push one item to the primary stream; and one item in other
stream; this should not produce items because there are no joins
- // and window has not ended
+ // push one item to the primary stream; and one item in other
stream; this should produce two spurious items
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0) }
// --> w2 = { 1:a1 (ts: 0) }
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic2.pipeInput(1, "a1", 0L);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "null+a1", 0L)
+ );
- // push one item on each stream with a window time after the
previous window ended (not closed); this should not produce
- // joined records because the window has ended, but will not
produce non-joined records because the window has not closed.
+ // push one item on each stream with a window time after the
previous window ended;
+ // this should produce two spurious items
// w1 = { 0:A0 (ts: 0) }
// w2 = { 1:a1 (ts: 0) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
inputTopic2.pipeInput(0, "a0", 101L);
inputTopic1.pipeInput(1, "A1", 101L);
- processor.checkAndClearProcessResult();
-
- // push a dummy item to the any stream after the window is closed;
this should produced all expired non-joined records because
- // the window has closed
- // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
- // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
- // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
- // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
- inputTopic2.pipeInput(0, "dummy", 211);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(1, "null+a1", 0L),
- new KeyValueTimestamp<>(0, "A0+null", 0L)
+ new KeyValueTimestamp<>(0, "null+a0", 101L),
+ new KeyValueTimestamp<>(1, "A1+null", 101L)
);
}
}
@Test
public void testOuterJoinWithInMemoryCustomSuppliers() {
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
final WindowBytesStoreSupplier thisStoreSupplier =
Stores.inMemoryWindowStore(
"in-memory-join-store",
@@ -527,7 +476,7 @@ public class KStreamKStreamOuterJoinTest {
@Test
public void testOuterJoinWithDefaultSuppliers() {
- final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
+ final JoinWindows joinWindows =
JoinWindows.of(ofMillis(100L)).grace(Duration.ZERO);
final StreamJoined<Integer, String, String> streamJoined =
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runOuterJoin(streamJoined, joinWindows);
@@ -567,11 +516,11 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
- // 2 window stores + 1 shared window store should be available
- assertEquals(3, driver.getAllStateStores().size());
+ // 2 window stores should be available
+ assertEquals(2, driver.getAllStateStores().size());
- // push two items to the primary stream; the other window is
empty; this should not
- // produce any items because window has not expired
+ // push two items to the primary stream; the other window is
empty; this should
+ // produce two spurious items
// w1 {}
// w2 {}
// --> w1 = { 0:A0, 1:A1 }
@@ -579,7 +528,10 @@ public class KStreamKStreamOuterJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
// push two items to the other stream; this should produce two
full-joined items
// w1 = { 0:A0, 1:A1 }
@@ -594,7 +546,7 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(1, "A1+a1", 0L)
);
- // push three items to the primary stream; this should produce two
full-joined items
+ // push three items to the primary stream; this should produce
threeitems
// w1 = { 0:A0, 1:A1 }
// w2 = { 0:a0, 1:a1 }
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -604,10 +556,11 @@ public class KStreamKStreamOuterJoinTest {
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "B0+a0", 0L),
- new KeyValueTimestamp<>(1, "B1+a1", 0L)
+ new KeyValueTimestamp<>(1, "B1+a1", 0L),
+ new KeyValueTimestamp<>(2, "B2+null", 0L)
);
- // push all items to the other stream; this should produce five
full-joined items
+ // push all items to the other stream; this should produce six
items
// w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
// w2 = { 0:a0, 1:a1 }
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
@@ -620,7 +573,8 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(0, "B0+b0", 0L),
new KeyValueTimestamp<>(1, "A1+b1", 0L),
new KeyValueTimestamp<>(1, "B1+b1", 0L),
- new KeyValueTimestamp<>(2, "B2+b2", 0L)
+ new KeyValueTimestamp<>(2, "B2+b2", 0L),
+ new KeyValueTimestamp<>(3, "null+b3", 0L)
);
// push all four items to the primary stream; this should produce
six full-joined items
@@ -639,11 +593,6 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(2, "C2+b2", 0L),
new KeyValueTimestamp<>(3, "C3+b3", 0L)
);
-
- // push a dummy record that should expire non-joined items; it
should not produce any items because
- // all of them are joined
- inputTopic1.pipeInput(0, "dummy", 400L);
- processor.checkAndClearProcessResult();
}
}
@@ -662,7 +611,7 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
+ JoinWindows.of(ofMillis(100)).grace(Duration.ZERO),
StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
);
joined.process(supplier);
@@ -681,7 +630,7 @@ public class KStreamKStreamOuterJoinTest {
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
final long time = 0L;
- // push two items to the primary stream; the other window is
empty; this should not produce items because window has not closed
+ // push two items to the primary stream; the other window is
empty; this should produce two spurious items
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -689,9 +638,12 @@ public class KStreamKStreamOuterJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i],
time);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(1, "A1+null", 0L)
+ );
- // push four items to the other stream; this should produce two
full-join items
+ // push four items to the other stream; this should produce four
full-join items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -701,7 +653,9 @@ public class KStreamKStreamOuterJoinTest {
}
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+a0", 0L),
- new KeyValueTimestamp<>(1, "A1+a1", 0L)
+ new KeyValueTimestamp<>(1, "A1+a1", 0L),
+ new KeyValueTimestamp<>(2, "null+a2", 0L),
+ new KeyValueTimestamp<>(3, "null+a3", 0L)
);
testUpperWindowBound(expectedKeys, driver, processor);
@@ -730,8 +684,10 @@ public class KStreamKStreamOuterJoinTest {
inputTopic2.pipeInput(expectedKeys[i], "b" + expectedKeys[i], time
+ i);
}
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(2, "null+a2", 0L),
- new KeyValueTimestamp<>(3, "null+a3", 0L)
+ new KeyValueTimestamp<>(0, "null+b0", 1000L),
+ new KeyValueTimestamp<>(1, "null+b1", 1001L),
+ new KeyValueTimestamp<>(2, "null+b2", 1002L),
+ new KeyValueTimestamp<>(3, "null+b3", 1003L)
);
// push four items with larger timestamp to the primary stream; this
should produce four full-join items
@@ -753,7 +709,7 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(3, "B3+b3", 1100L)
);
- // push four items with increased timestamp to the primary stream;
this should produce three full-join items (non-joined item is not produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce three full-join items plus one spurious item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100) }
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
@@ -768,12 +724,13 @@ public class KStreamKStreamOuterJoinTest {
inputTopic1.pipeInput(expectedKey, "C" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "C0+null", 1101L),
new KeyValueTimestamp<>(1, "C1+b1", 1101L),
new KeyValueTimestamp<>(2, "C2+b2", 1101L),
new KeyValueTimestamp<>(3, "C3+b3", 1101L)
);
- // push four items with increased timestamp to the primary stream;
this should produce two full-join items (non-joined items are not produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce two full-join items plus two spurious items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101) }
@@ -790,11 +747,13 @@ public class KStreamKStreamOuterJoinTest {
inputTopic1.pipeInput(expectedKey, "D" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "D0+null", 1102L),
+ new KeyValueTimestamp<>(1, "D1+null", 1102L),
new KeyValueTimestamp<>(2, "D2+b2", 1102L),
new KeyValueTimestamp<>(3, "D3+b3", 1102L)
);
- // push four items with increased timestamp to the primary stream;
this should produce one full-join items (three non-joined left-join are not
produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce one full-join items plus three spurious itmes
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101),
@@ -813,10 +772,13 @@ public class KStreamKStreamOuterJoinTest {
inputTopic1.pipeInput(expectedKey, "E" + expectedKey, time);
}
processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "E0+null", 1103L),
+ new KeyValueTimestamp<>(1, "E1+null", 1103L),
+ new KeyValueTimestamp<>(2, "E2+null", 1103L),
new KeyValueTimestamp<>(3, "E3+b3", 1103L)
);
- // push four items with increased timestamp to the primary stream;
this should produce no full-join items (four non-joined left-join are not
produced yet)
+ // push four items with increased timestamp to the primary stream;
this should produce four spurious items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts:
1100),
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts:
1101),
@@ -836,23 +798,12 @@ public class KStreamKStreamOuterJoinTest {
for (final int expectedKey : expectedKeys) {
inputTopic1.pipeInput(expectedKey, "F" + expectedKey, time);
}
- processor.checkAndClearProcessResult();
-
- // push a dummy record to produce all left-join non-joined items
- time += 301L;
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
- inputTopic1.pipeInput(0, "dummy", time);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "C0+null", 1101L),
- new KeyValueTimestamp<>(0, "D0+null", 1102L),
- new KeyValueTimestamp<>(1, "D1+null", 1102L),
- new KeyValueTimestamp<>(0, "E0+null", 1103L),
- new KeyValueTimestamp<>(1, "E1+null", 1103L),
- new KeyValueTimestamp<>(2, "E2+null", 1103L),
new KeyValueTimestamp<>(0, "F0+null", 1104L),
new KeyValueTimestamp<>(1, "F1+null", 1104L),
new KeyValueTimestamp<>(2, "F2+null", 1104L),
new KeyValueTimestamp<>(3, "F3+null", 1104L)
+
);
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 92ca5bd..ad02d04 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -379,7 +379,7 @@ class TopologyTest {
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
-
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)))(
+
JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
@@ -387,7 +387,7 @@ class TopologyTest {
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
-
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)))(
+
JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@@ -439,7 +439,7 @@ class TopologyTest {
.join[Integer, String](
stream2,
valueJoiner2,
- JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)),
+ JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
SerdesJ.Integer)
)
.to(JOINED_TOPIC)
@@ -449,7 +449,7 @@ class TopologyTest {
.join(
stream3,
valueJoiner3,
- JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)),
+ JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
SerdesJ.String)
)
.to(JOINED_TOPIC)
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 0ec7b0e..1ba1238 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -201,7 +201,7 @@ class KStreamTest extends TestDriver {
val stream1 = builder.stream[String, String](sourceTopic1)
val stream2 = builder.stream[String, String](sourceTopic2)
stream1
- .join(stream2)((a, b) => s"$a-$b",
JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24)))
+ .join(stream2)((a, b) => s"$a-$b",
JoinWindows.of(ofSeconds(1)).grace(Duration.ofHours(24)))
.to(sinkTopic)
val now = Instant.now()