This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 90bd03a  fix: make sliding window works without grace period 
(#kafka-13739) (#11928)
90bd03a is described below

commit 90bd03a0a27b1595a035f575daeb9041b4b054b6
Author: Bounkong Khamphousone <[email protected]>
AuthorDate: Thu Mar 31 16:05:53 2022 +0200

    fix: make sliding window works without grace period (#kafka-13739) (#11928)
    
    Fix upperbound for sliding window, making it compatible with no grace 
period (kafka-13739)
    
    Added unit test for early sliding window and "normal" sliding window for 
both events within one time difference (small input) and above window time 
difference (large input).
    
    Fixing this window interval may slightly change stream behavior but 
probability to happen is extremely slow and may not have a huge impact on the 
result given.
    
    Reviewers Leah Thomas <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../internals/KStreamSlidingWindowAggregate.java   |   2 +-
 .../KStreamSlidingWindowAggregateTest.java         | 304 +++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index fd0198b..73c901c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                                             final long closeTime) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
-            if (windowEnd > closeTime) {
+            if (windowEnd >= closeTime) {
                 //get aggregate from existing window
                 final VAgg oldAgg = getValueOrNull(valueAndTime);
                 final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 8e8115f..b227c71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -699,6 +699,310 @@ public class KStreamSlidingWindowAggregateTest {
     }
 
     @Test
+    public void testEarlyNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        // all events are considered as early events since record timestamp is 
less than time difference of the window
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 0L);
+            inputTopic.pipeInput("A", "2", 5L);
+            inputTopic.pipeInput("A", "3", 6L);
+            inputTopic.pipeInput("A", "4", 3L);
+            inputTopic.pipeInput("A", "5", 13L);
+            inputTopic.pipeInput("A", "6", 10L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+        expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+        expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+        expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+        expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+        expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 100L);
+            inputTopic.pipeInput("A", "2", 105L);
+            inputTopic.pipeInput("A", "3", 106L);
+            inputTopic.pipeInput("A", "4", 103L);
+            inputTopic.pipeInput("A", "5", 113L);
+            inputTopic.pipeInput("A", "6", 110L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+        expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+        expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+        expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+        expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+        expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+        expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+        expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+        expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEarlyNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 0L);
+            inputTopic1.pipeInput("E", "3", 5L);
+            inputTopic1.pipeInput("E", "4", 6L);
+            inputTopic1.pipeInput("E", "2", 3L);
+            inputTopic1.pipeInput("E", "6", 13L);
+            inputTopic1.pipeInput("E", "5", 10L);
+            inputTopic1.pipeInput("E", "7", 4L);
+            inputTopic1.pipeInput("E", "8", 2L);
+            inputTopic1.pipeInput("E", "9", 15L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1", 0),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4+2", 6),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4+2", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5+7", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6+9", 15),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 
24)), "0+9", 15)),
+            actual
+        );
+    }
+
+    @Test
+    public void testNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 100L);
+            inputTopic1.pipeInput("E", "3", 105L);
+            inputTopic1.pipeInput("E", "4", 106L);
+            inputTopic1.pipeInput("E", "2", 103L);
+            inputTopic1.pipeInput("E", "6", 113L);
+            inputTopic1.pipeInput("E", "5", 110L);
+            inputTopic1.pipeInput("E", "7", 104L);
+            inputTopic1.pipeInput("E", "8", 102L);
+            inputTopic1.pipeInput("E", "9", 115L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 
100)), "0+1", 100),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 
105)), "0+1+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4+2", 106),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4+2", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5+9", 115),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5+9", 115),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6+9", 115),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(114, 124)), "0+9", 115)),
+            actual
+        );
+    }
+
+    @Test
     public void shouldLogAndMeterWhenSkippingNullKey() {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
         final StreamsBuilder builder = new StreamsBuilder();

Reply via email to