This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c279b6  fix: make sliding window works without grace period 
(#kafka-13739) (#11928)
3c279b6 is described below

commit 3c279b63fa862671330b568f5f58df69f9352c35
Author: Bounkong Khamphousone <[email protected]>
AuthorDate: Thu Mar 31 16:05:53 2022 +0200

    fix: make sliding window works without grace period (#kafka-13739) (#11928)
    
    Fix upperbound for sliding window, making it compatible with no grace 
period (kafka-13739)
    
    Added unit test for early sliding window and "normal" sliding window for 
both events within one time difference (small input) and above window time 
difference (large input).
    
    Fixing this window interval may slightly change stream behavior but 
probability to happen is extremely slow and may not have a huge impact on the 
result given.
    
    Reviewers Leah Thomas <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../internals/KStreamSlidingWindowAggregate.java   |   2 +-
 .../KStreamSlidingWindowAggregateTest.java         | 304 +++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index fd0198b..73c901c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                                             final long closeTime) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
-            if (windowEnd > closeTime) {
+            if (windowEnd >= closeTime) {
                 //get aggregate from existing window
                 final VAgg oldAgg = getValueOrNull(valueAndTime);
                 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 8e8115f..b227c71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -699,6 +699,310 @@ public class KStreamSlidingWindowAggregateTest {
     }
 
     @Test
+    public void testEarlyNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        // all events are considered as early events since record timestamp is 
less than time difference of the window
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 0L);
+            inputTopic.pipeInput("A", "2", 5L);
+            inputTopic.pipeInput("A", "3", 6L);
+            inputTopic.pipeInput("A", "4", 3L);
+            inputTopic.pipeInput("A", "5", 13L);
+            inputTopic.pipeInput("A", "6", 10L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+        expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+        expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+        expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+        expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+        expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 100L);
+            inputTopic.pipeInput("A", "2", 105L);
+            inputTopic.pipeInput("A", "3", 106L);
+            inputTopic.pipeInput("A", "4", 103L);
+            inputTopic.pipeInput("A", "5", 113L);
+            inputTopic.pipeInput("A", "6", 110L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+        expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+        expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+        expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+        expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+        expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+        expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+        expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+        expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEarlyNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 0L);
+            inputTopic1.pipeInput("E", "3", 5L);
+            inputTopic1.pipeInput("E", "4", 6L);
+            inputTopic1.pipeInput("E", "2", 3L);
+            inputTopic1.pipeInput("E", "6", 13L);
+            inputTopic1.pipeInput("E", "5", 10L);
+            inputTopic1.pipeInput("E", "7", 4L);
+            inputTopic1.pipeInput("E", "8", 2L);
+            inputTopic1.pipeInput("E", "9", 15L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1", 0),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4+2", 6),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4+2", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5+7", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6+9", 15),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 
24)), "0+9", 15)),
+            actual
+        );
+    }
+
+    @Test
+    public void testNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 100L);
+            inputTopic1.pipeInput("E", "3", 105L);
+            inputTopic1.pipeInput("E", "4", 106L);
+            inputTopic1.pipeInput("E", "2", 103L);
+            inputTopic1.pipeInput("E", "6", 113L);
+            inputTopic1.pipeInput("E", "5", 110L);
+            inputTopic1.pipeInput("E", "7", 104L);
+            inputTopic1.pipeInput("E", "8", 102L);
+            inputTopic1.pipeInput("E", "9", 115L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 
100)), "0+1", 100),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 
105)), "0+1+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4+2", 106),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4+2", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5+9", 115),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5+9", 115),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6+9", 115),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(114, 124)), "0+9", 115)),
+            actual
+        );
+    }
+
+    @Test
     public void shouldLogAndMeterWhenSkippingNullKey() {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
         final StreamsBuilder builder = new StreamsBuilder();

Reply via email to