This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 90bd03a fix: make sliding window works without grace period
(#kafka-13739) (#11928)
90bd03a is described below
commit 90bd03a0a27b1595a035f575daeb9041b4b054b6
Author: Bounkong Khamphousone <[email protected]>
AuthorDate: Thu Mar 31 16:05:53 2022 +0200
fix: make sliding window works without grace period (#kafka-13739) (#11928)
Fix upperbound for sliding window, making it compatible with no grace
period (kafka-13739)
Added unit test for early sliding window and "normal" sliding window for
both events within one time difference (small input) and above window time
difference (large input).
Fixing this window interval may slightly change stream behavior but
probability to happen is extremely slow and may not have a huge impact on the
result given.
Reviewers Leah Thomas <[email protected]>, Bill Bejeck
<[email protected]>
---
.../internals/KStreamSlidingWindowAggregate.java | 2 +-
.../KStreamSlidingWindowAggregateTest.java | 304 +++++++++++++++++++++
2 files changed, 305 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index fd0198b..73c901c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
final long closeTime) {
final long windowStart = window.start();
final long windowEnd = window.end();
- if (windowEnd > closeTime) {
+ if (windowEnd >= closeTime) {
//get aggregate from existing window
final VAgg oldAgg = getValueOrNull(valueAndTime);
final VAgg newAgg = aggregator.apply(record.key(),
record.value(), oldAgg);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 8e8115f..b227c71 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -699,6 +699,310 @@ public class KStreamSlidingWindowAggregateTest {
}
@Test
+ public void testEarlyNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ // all events are considered as early events since record timestamp is
less than time difference of the window
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 0L);
+ inputTopic.pipeInput("A", "2", 5L);
+ inputTopic.pipeInput("A", "3", 6L);
+ inputTopic.pipeInput("A", "4", 3L);
+ inputTopic.pipeInput("A", "5", 13L);
+ inputTopic.pipeInput("A", "6", 10L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+ expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+ expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+ expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+ expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+ expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 100L);
+ inputTopic.pipeInput("A", "2", 105L);
+ inputTopic.pipeInput("A", "3", 106L);
+ inputTopic.pipeInput("A", "4", 103L);
+ inputTopic.pipeInput("A", "5", 113L);
+ inputTopic.pipeInput("A", "6", 110L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+ expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+ expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+ expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+ expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+ expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+ expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+ expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+ expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testEarlyNoGracePeriodLargeInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+ final WindowBytesStoreSupplier storeSupplier =
+ inOrderIterator
+ ? new InOrderMemoryWindowStoreSupplier("InOrder",
500L, 10L, false)
+ : Stores.inMemoryWindowStore("Reverse",
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(storeSupplier)
+ );
+
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic1.pipeInput("E", "1", 0L);
+ inputTopic1.pipeInput("E", "3", 5L);
+ inputTopic1.pipeInput("E", "4", 6L);
+ inputTopic1.pipeInput("E", "2", 3L);
+ inputTopic1.pipeInput("E", "6", 13L);
+ inputTopic1.pipeInput("E", "5", 10L);
+ inputTopic1.pipeInput("E", "7", 4L);
+ inputTopic1.pipeInput("E", "8", 2L);
+ inputTopic1.pipeInput("E", "9", 15L);
+ }
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>>
comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>,
String> o) -> o.key().key())
+ .thenComparing((KeyValueTimestamp<Windowed<String>,
String> o) -> o.key().window().start());
+
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual =
supplier.theCapturedProcessor().processed();
+ actual.sort(comparator);
+ assertEquals(
+ asList(
+ // E@0
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0,
10)), "0+1", 0),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0,
10)), "0+1+3", 5),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0,
10)), "0+1+3+4", 6),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0,
10)), "0+1+3+4+2", 6),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1,
11)), "0+3", 5),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1,
11)), "0+3+4", 6),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1,
11)), "0+3+4+2", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3,
13)), "0+3+4+2+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3,
13)), "0+3+4+2+6+5", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3,
13)), "0+3+4+2+6+5+7", 13),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4,
14)), "0+3+4", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4,
14)), "0+3+4+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4,
14)), "0+3+4+6+5", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4,
14)), "0+3+4+6+5+7", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5,
15)), "0+3+4+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5,
15)), "0+3+4+6+5+9", 15),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6,
16)), "0+4", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6,
16)), "0+4+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6,
16)), "0+4+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6,
16)), "0+4+6+5+9", 15),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7,
17)), "0+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7,
17)), "0+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7,
17)), "0+6+5+9", 15),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11,
21)), "0+6", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11,
21)), "0+6+9", 15),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14,
24)), "0+9", 15)),
+ actual
+ );
+ }
+
+ @Test
+ public void testNoGracePeriodLargeInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+ final WindowBytesStoreSupplier storeSupplier =
+ inOrderIterator
+ ? new InOrderMemoryWindowStoreSupplier("InOrder",
500L, 10L, false)
+ : Stores.inMemoryWindowStore("Reverse",
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(storeSupplier)
+ );
+
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic1.pipeInput("E", "1", 100L);
+ inputTopic1.pipeInput("E", "3", 105L);
+ inputTopic1.pipeInput("E", "4", 106L);
+ inputTopic1.pipeInput("E", "2", 103L);
+ inputTopic1.pipeInput("E", "6", 113L);
+ inputTopic1.pipeInput("E", "5", 110L);
+ inputTopic1.pipeInput("E", "7", 104L);
+ inputTopic1.pipeInput("E", "8", 102L);
+ inputTopic1.pipeInput("E", "9", 115L);
+ }
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>>
comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>,
String> o) -> o.key().key())
+ .thenComparing((KeyValueTimestamp<Windowed<String>,
String> o) -> o.key().window().start());
+
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual =
supplier.theCapturedProcessor().processed();
+ actual.sort(comparator);
+ assertEquals(
+ asList(
+ // E@0
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90,
100)), "0+1", 100),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95,
105)), "0+1+3", 105),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96,
106)), "0+1+3+4", 106),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96,
106)), "0+1+3+4+2", 106),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(101, 111)), "0+3", 105),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(101, 111)), "0+3+4", 106),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(101, 111)), "0+3+4+2", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(103, 113)), "0+3+4+2+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(104, 114)), "0+3+4", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(104, 114)), "0+3+4+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(104, 114)), "0+3+4+6+5", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(105, 115)), "0+3+4+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(106, 116)), "0+4", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(106, 116)), "0+4+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(106, 116)), "0+4+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(106, 116)), "0+4+6+5+9", 115),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(107, 117)), "0+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(107, 117)), "0+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(107, 117)), "0+6+5+9", 115),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(111, 121)), "0+6", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(111, 121)), "0+6+9", 115),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(114, 124)), "0+9", 115)),
+ actual
+ );
+ }
+
+ @Test
public void shouldLogAndMeterWhenSkippingNullKey() {
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
final StreamsBuilder builder = new StreamsBuilder();