This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8cafbfe2d0 MINOR: Session windows should accept zero as session gap 
(#18734)
b8cafbfe2d0 is described below

commit b8cafbfe2d018350712aaaec2486cf4d8a3414f1
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:45:27 2025 -0800

    MINOR: Session windows should accept zero as session gap (#18734)
    
    Reviewers: Almog Gavra <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>
---
 .../kafka/streams/kstream/SessionWindows.java      | 15 +++++-----
 .../kafka/streams/kstream/SessionWindowsTest.java  |  6 ++--
 .../kstream/internals/KGroupedStreamImplTest.java  | 35 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 0bf5dcfebd4..c5fabdb72cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -31,7 +31,7 @@ import static 
org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
  * <p>
  * Sessions represent a period of activity separated by a defined gap of 
inactivity.
  * Any events processed that fall within the inactivity gap of any existing 
sessions are merged into the existing sessions.
- * If the event falls outside of the session gap then a new session will be 
created.
+ * If the event falls outside the session gap then a new session will be 
created.
  * <p>
  * For example, if we have a session gap of 5 and the following data arrives:
  * <pre>
@@ -79,12 +79,12 @@ public final class SessionWindows {
         this.gapMs = gapMs;
         this.graceMs = graceMs;
 
-        if (gapMs <= 0) {
-            throw new IllegalArgumentException("Gap time cannot be zero or 
negative.");
+        if (gapMs < 0) {
+            throw new IllegalArgumentException("Gap time cannot be negative.");
         }
 
         if (graceMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be 
negative.");
+            throw new IllegalArgumentException("Grace period cannot be 
negative.");
         }
     }
 
@@ -100,7 +100,8 @@ public final class SessionWindows {
      *
      * @param inactivityGap the gap of inactivity between sessions
      * @return a window definition with the window size and no grace period. 
Note that this means out-of-order records arriving after the window end will be 
dropped
-     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+     * @throws IllegalArgumentException
+     *         if {@code inactivityGap} is negative or can't be represented as 
{@code long milliseconds}
      */
     public static SessionWindows ofInactivityGapWithNoGrace(final Duration 
inactivityGap) {
         return ofInactivityGapAndGrace(inactivityGap, 
ofMillis(NO_GRACE_PERIOD));
@@ -121,8 +122,8 @@ public final class SessionWindows {
      * @param inactivityGap the gap of inactivity between sessions
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return A SessionWindows object with the specified inactivity gap and 
grace period
-     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
-     *                                  if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+     * @throws IllegalArgumentException
+     *         if {@code inactivityGap} or {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
      */
     public static SessionWindows ofInactivityGapAndGrace(final Duration 
inactivityGap, final Duration afterWindowEnd) {
         final String inactivityGapMsgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 755ae96f1dc..9b2fd6bb547 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -66,13 +66,13 @@ public class SessionWindowsTest {
     }
 
     @Test
-    public void windowSizeMustNotBeNegative() {
+    public void sessionGapCannotBeNegative() {
         assertThrows(IllegalArgumentException.class, () -> 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(-1)));
     }
 
     @Test
-    public void windowSizeMustNotBeZero() {
-        assertThrows(IllegalArgumentException.class, () -> 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0)));
+    public void sessionGapCanBeZero() {
+        SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 1481733d714..e538ebe0703 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -371,6 +372,40 @@ public class KGroupedStreamImplTest {
         doAggregateSessionWindows(supplier);
     }
 
+    @Test
+    public void 
sessionGapOfZeroShouldOnlyPutRecordsWithSameTsIntoSameSession() {
+        final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+        final KTable<Windowed<String>, Integer> table = groupedStream
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ZERO))
+            .aggregate(
+                () -> 0,
+                (aggKey, value, aggregate) -> aggregate + 1,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                Materialized.with(null, Serdes.Integer()));
+        table.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(TOPIC, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("1", "1", 10);
+            inputTopic.pipeInput("1", "1", 11);
+            inputTopic.pipeInput("1", "1", 11);
+            inputTopic.pipeInput("1", "1", 12);
+        }
+
+        final Map<Windowed<String>, ValueAndTimestamp<Integer>> result
+            = supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
+        assertEquals(
+            ValueAndTimestamp.make(1, 10),
+            result.get(new Windowed<>("1", new SessionWindow(10L, 10L))));
+        assertEquals(
+            ValueAndTimestamp.make(2, 11L),
+            result.get(new Windowed<>("1", new SessionWindow(11L, 11L))));
+        assertEquals(
+            ValueAndTimestamp.make(1, 12L),
+            result.get(new Windowed<>("1", new SessionWindow(12L, 12L))));
+    }
+
     private void doCountSessionWindows(final 
MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =

Reply via email to