This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e81379d3fea KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit
non-joined items (#14426)
e81379d3fea is described below
commit e81379d3fea956dd8900b7f4b68e0c1328401871
Author: Victor van den Hoven <[email protected]>
AuthorDate: Wed Mar 6 02:06:20 2024 +0100
KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items
(#14426)
Kafka Streams support asymmetric join windows. Depending on the window
configuration
we need to compute window close time etc differently.
This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct,
and
introduced the `windowsAfterIntervalMs`-field that is used to find if
emitting records can be skipped.
Reviewers: Hao Li <[email protected]>, Guozhang Wang
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../kstream/internals/KStreamKStreamJoin.java | 38 ++++-
.../integration/KStreamKStreamIntegrationTest.java | 2 +
.../internals/KStreamKStreamLeftJoinTest.java | 183 ++++++++++++++++++++-
.../internals/KStreamKStreamOuterJoinTest.java | 41 ++---
4 files changed, 236 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 603e1e82550..124386b9bc3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,7 +51,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
private final long joinAfterMs;
private final long joinGraceMs;
private final boolean enableSpuriousResultFix;
- private final long joinSpuriousLookBackTimeMs;
+ private final long windowsBeforeMs;
+ private final long windowsAfterMs;
private final boolean outer;
private final boolean isLeftSide;
@@ -72,12 +73,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
- this.joinSpuriousLookBackTimeMs = windows.beforeMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
- this.joinSpuriousLookBackTimeMs = windows.afterMs;
}
+ this.windowsAfterMs = windows.afterMs;
+ this.windowsBeforeMs = windows.beforeMs;
this.joinGraceMs = windows.gracePeriodMs();
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
this.joiner = joiner;
@@ -136,11 +137,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
return;
}
- boolean needOuterJoin = outer;
// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store ->
emitNonJoinedOuterRecords(store, record));
}
+
+ boolean needOuterJoin = outer;
try (final WindowStoreIterator<V2> iter =
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
@@ -200,7 +202,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
// to reduce runtime cost, we try to avoid paying those cost
// only try to emit left/outer join results if there _might_ be
any result records
- if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime -
joinSpuriousLookBackTimeMs - joinGraceMs) {
+ if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >=
sharedTimeTracker.streamTime) {
return;
}
// throttle the emit frequency to a (configurable) interval;
@@ -222,6 +224,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
TimestampedKeyAndJoinSide<K> prevKey = null;
while (it.hasNext()) {
+ boolean outerJoinLeftBreak = false;
+ boolean outerJoinRightBreak = false;
final KeyValue<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K>
timestampedKeyAndJoinSide = next.key;
final LeftOrRightValue<V1, V2> value = next.value;
@@ -230,8 +234,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
sharedTimeTracker.minTime = timestamp;
// Skip next records if window has not closed
- if (timestamp + joinAfterMs + joinGraceMs >=
sharedTimeTracker.streamTime) {
- break;
+ final long outerJoinLookBackTimeMs =
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+ if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs +
joinGraceMs >= sharedTimeTracker.streamTime) {
+ if (timestampedKeyAndJoinSide.isLeftSide()) {
+ outerJoinLeftBreak = true; // there are no more
candidates to emit on left-outerJoin-side
+ } else {
+ outerJoinRightBreak = true; // there are no more
candidates to emit on right-outerJoin-side
+ }
+ if (outerJoinLeftBreak && outerJoinRightBreak) {
+ break; // there are no more candidates to emit on
left-outerJoin-side and
+ // right-outerJoin-side
+ } else {
+ continue; // there are possibly candidates left on
the other outerJoin-side
+ }
}
final VOut nullJoinedValue;
@@ -268,6 +283,15 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
}
}
+ private long getOuterJoinLookBackTimeMs(final
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
+ // depending on the JoinSide we fill in the outerJoinLookBackTimeMs
+ if (timestampedKeyAndJoinSide.isLeftSide()) {
+ return windowsAfterMs; // On the left-JoinSide we look back in
time
+ } else {
+ return windowsBeforeMs; // On the right-JoinSide we look
forward in time
+ }
+ }
+
@Override
public void close() {
sharedTimeTrackerSupplier.remove(context().taskId());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
index 1d9a77b5bf4..10ab37cee07 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -99,6 +100,7 @@ public class KStreamKStreamIntegrationTest {
final String safeTestName = safeUniqueTestName(testInfo);
streamsConfig = getStreamsConfig(safeTestName);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+
streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
0L);
}
@AfterEach
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 156b553455d..fd36b241b27 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -436,6 +436,184 @@ public class KStreamKStreamLeftJoinTest {
}
}
+ @Test
+ public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+ StreamJoined.with(Serdes.Integer(),
+ Serdes.String(),
+ Serdes.String())
+ );
+ joined.process(supplier);
+
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockApiProcessor<Integer, String, Void, Void> processor =
supplier.theCapturedProcessor();
+
+ processor.init(null);
+
+ // push four items with increasing timestamps to the primary
stream; the other window is empty;
+ // this should emit the first three left-joined items;
+ // A3 is not triggered yet
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = {}
+ long time = 1000L;
+ for (int i = 0; i < expectedKeys.length; i++) {
+ inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i],
time + i);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 1000L),
+ new KeyValueTimestamp<>(1, "A1+null", 1001L),
+ new KeyValueTimestamp<>(2, "A2+null", 1002L)
+ );
+
+ // push four items smaller timestamps (out of window) to the
secondary stream;
+ // this should produce four joined items
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999) }
+ time = 1000L - 1L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+a0", 1000L),
+ new KeyValueTimestamp<>(1, "A1+a1", 1001L),
+ new KeyValueTimestamp<>(2, "A2+a2", 1002L),
+ new KeyValueTimestamp<>(3, "A3+a3", 1003L)
+ );
+
+ // push four items with increased timestamps to the secondary
stream;
+ // this should produce four joined item
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999) }
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000),
3:b3 (ts: 1000) }
+ time += 1L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+b0", 1000L),
+ new KeyValueTimestamp<>(1, "A1+b1", 1001L),
+ new KeyValueTimestamp<>(2, "A2+b2", 1002L),
+ new KeyValueTimestamp<>(3, "A3+b3", 1003L)
+ );
+
+ // push four items with increased timestamps to the secondary
stream;
+ // this should produce only three joined items;
+ // c0 arrives too late to be joined with A0
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3
(ts: 1000) }
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000),
3:b3 (ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001),
3:c3 (ts: 1001) }
+ time += 1L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "A1+c1", 1001L),
+ new KeyValueTimestamp<>(2, "A2+c2", 1002L),
+ new KeyValueTimestamp<>(3, "A3+c3", 1003L)
+ );
+
+ // push four items with increased timestamps to the secondary
stream;
+ // this should produce only two joined items;
+ // d0 and d1 arrive too late to be joined with A0 and A1
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3
(ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3
(ts: 1001) }
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000),
3:b3 (ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001),
3:c3 (ts: 1001),
+ // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002),
3:d3 (ts: 1002) }
+ time += 1L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "A2+d2", 1002L),
+ new KeyValueTimestamp<>(3, "A3+d3", 1003L)
+ );
+
+ // push four items with increased timestamps to the secondary
stream;
+ // this should produce one joined item;
+ // only e3 can be joined with A3;
+ // e0, e1 and e2 arrive too late to be joined with A0, A1 and A2
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3
(ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3
(ts: 1001),
+ // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3
(ts: 1002) }
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000),
3:b3 (ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001),
3:c3 (ts: 1001),
+ // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002),
3:d3 (ts: 1002),
+ // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003),
3:e3 (ts: 1003) }
+ time += 1L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(3, "A3+e3", 1003L)
+ );
+
+ // push four items with larger timestamps to the secondary stream;
+ // no (non-)joined items can be produced
+ //
+ // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3
(ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3
(ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3
(ts: 1001),
+ // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3
(ts: 1002),
+ // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3
(ts: 1003) }
+ // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002),
3:A3 (ts: 1003) }
+ // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3
(ts: 999),
+ // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000),
3:b3 (ts: 1000),
+ // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001),
3:c3 (ts: 1001),
+ // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002),
3:d3 (ts: 1002),
+ // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003),
3:e3 (ts: 1003),
+ // 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100),
3:f3 (ts: 1100) }
+ time = 1000 + 100L;
+ for (final int expectedKey : expectedKeys) {
+ inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time);
+ }
+ processor.checkAndClearProcessResult();
+ }
+ }
+
@Test
public void testLeftJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows =
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
@@ -609,8 +787,9 @@ public class KStreamKStreamLeftJoinTest {
inputTopic1.pipeInput(1, "A1", 100L);
processor.checkAndClearProcessResult();
- // push one item to the other window that has a join; this should
produce non-joined records with a closed window first, then
- // the joined records
+ // push one item to the other window that has a join;
+ // this should produce the joined record first;
+ // then non-joined record with a closed window
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 099dc5b0c83..28a5f1488fb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -108,11 +108,11 @@ public class KStreamKStreamOuterJoinTest {
inputTopic2.pipeInput(1, "b1", 0L);
processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "A0+null", 0L),
- new KeyValueTimestamp<>(0, "A0-0+null", 0L),
- new KeyValueTimestamp<>(0, "A0+a0", 0L),
- new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
- new KeyValueTimestamp<>(1, "null+b1", 0L)
+ new KeyValueTimestamp<>(0, "A0+null", 0L),
+ new KeyValueTimestamp<>(0, "A0-0+null", 0L),
+ new KeyValueTimestamp<>(0, "A0+a0", 0L),
+ new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
+ new KeyValueTimestamp<>(1, "null+b1", 0L)
);
}
}
@@ -438,13 +438,13 @@ public class KStreamKStreamOuterJoinTest {
inputTopic1.pipeInput(1, "A1", 100L);
processor.checkAndClearProcessResult();
- // push one item to the other window that has a join; this should
produce non-joined records with a closed window first, then
- // the joined records
- // by the time they were produced before
+ // push one item to the other window that has a join;
+ // this should produce the not-joined record first;
+ // then the joined record
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
- // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
- // --> w2 = { 0:a0 (ts: 110) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
+ // --> w2 = { 1:a1 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 0L),
@@ -788,7 +788,7 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(1, "A1+null", 1L)
);
- // push one item to the other stream; this should not produce any
items
+ // push one item to the other stream; this should produce one
right-join item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
// w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -841,7 +841,8 @@ public class KStreamKStreamOuterJoinTest {
final MockApiProcessor<Integer, String, Void, Void> processor =
supplier.theCapturedProcessor();
long time = 0L;
- // push two items to the primary stream; the other window is
empty; this should not produce any item
+ // push two items to the primary stream; the other window is
empty;
+ // this should produce one left-joined item
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -849,7 +850,9 @@ public class KStreamKStreamOuterJoinTest {
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i],
time + i);
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L)
+ );
// push one item to the other stream; this should produce one
full-join item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -863,7 +866,8 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(1, "A1+a1", 1L)
);
- // push one item to the other stream; this should produce one
left-join item
+ // push one item to the other stream;
+ // this should not produce any item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
// w2 = { 1:a1 (ts: 1) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -871,9 +875,7 @@ public class KStreamKStreamOuterJoinTest {
time += 100;
inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2],
time);
- processor.checkAndClearProcessResult(
- new KeyValueTimestamp<>(0, "A0+null", 0L)
- );
+ processor.checkAndClearProcessResult();
// push one item to the other stream; this should not produce any
item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -884,11 +886,12 @@ public class KStreamKStreamOuterJoinTest {
processor.checkAndClearProcessResult();
- // push one item to the first stream; this should produce one
full-join item
+ // push one item to the first stream;
+ // this should produce one inner-join item;
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
// w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
- // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+ // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
time += 100;
inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2],
time);