This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8cafbfe2d0 MINOR: Session windows should accept zero as session gap
(#18734)
b8cafbfe2d0 is described below
commit b8cafbfe2d018350712aaaec2486cf4d8a3414f1
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:45:27 2025 -0800
MINOR: Session windows should accept zero as session gap (#18734)
Reviewers: Almog Gavra <[email protected]>, Anna Sophie Blee-Goldman
<[email protected]>
---
.../kafka/streams/kstream/SessionWindows.java | 15 +++++-----
.../kafka/streams/kstream/SessionWindowsTest.java | 6 ++--
.../kstream/internals/KGroupedStreamImplTest.java | 35 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 0bf5dcfebd4..c5fabdb72cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -31,7 +31,7 @@ import static
org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
* <p>
* Sessions represent a period of activity separated by a defined gap of
inactivity.
* Any events processed that fall within the inactivity gap of any existing
sessions are merged into the existing sessions.
- * If the event falls outside of the session gap then a new session will be
created.
+ * If the event falls outside the session gap then a new session will be
created.
* <p>
* For example, if we have a session gap of 5 and the following data arrives:
* <pre>
@@ -79,12 +79,12 @@ public final class SessionWindows {
this.gapMs = gapMs;
this.graceMs = graceMs;
- if (gapMs <= 0) {
- throw new IllegalArgumentException("Gap time cannot be zero or
negative.");
+ if (gapMs < 0) {
+ throw new IllegalArgumentException("Gap time cannot be negative.");
}
if (graceMs < 0) {
- throw new IllegalArgumentException("Grace period must not be
negative.");
+ throw new IllegalArgumentException("Grace period cannot be
negative.");
}
}
@@ -100,7 +100,8 @@ public final class SessionWindows {
*
* @param inactivityGap the gap of inactivity between sessions
* @return a window definition with the window size and no grace period.
Note that this means out-of-order records arriving after the window end will be
dropped
- * @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
+ * @throws IllegalArgumentException
+ * if {@code inactivityGap} is negative or can't be represented as
{@code long milliseconds}
*/
public static SessionWindows ofInactivityGapWithNoGrace(final Duration
inactivityGap) {
return ofInactivityGapAndGrace(inactivityGap,
ofMillis(NO_GRACE_PERIOD));
@@ -121,8 +122,8 @@ public final class SessionWindows {
* @param inactivityGap the gap of inactivity between sessions
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return A SessionWindows object with the specified inactivity gap and
grace period
- * @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
- * if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
+ * @throws IllegalArgumentException
+ * if {@code inactivityGap} or {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapAndGrace(final Duration
inactivityGap, final Duration afterWindowEnd) {
final String inactivityGapMsgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 755ae96f1dc..9b2fd6bb547 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -66,13 +66,13 @@ public class SessionWindowsTest {
}
@Test
- public void windowSizeMustNotBeNegative() {
+ public void sessionGapCannotBeNegative() {
assertThrows(IllegalArgumentException.class, () ->
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(-1)));
}
@Test
- public void windowSizeMustNotBeZero() {
- assertThrows(IllegalArgumentException.class, () ->
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0)));
+ public void sessionGapCanBeZero() {
+ SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 1481733d714..e538ebe0703 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -371,6 +372,40 @@ public class KGroupedStreamImplTest {
doAggregateSessionWindows(supplier);
}
+ @Test
+ public void
sessionGapOfZeroShouldOnlyPutRecordsWithSameTsIntoSameSession() {
+ final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ final KTable<Windowed<String>, Integer> table = groupedStream
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ZERO))
+ .aggregate(
+ () -> 0,
+ (aggKey, value, aggregate) -> aggregate + 1,
+ (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+ Materialized.with(null, Serdes.Integer()));
+ table.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(TOPIC, new StringSerializer(), new
StringSerializer());
+ inputTopic.pipeInput("1", "1", 10);
+ inputTopic.pipeInput("1", "1", 11);
+ inputTopic.pipeInput("1", "1", 11);
+ inputTopic.pipeInput("1", "1", 12);
+ }
+
+ final Map<Windowed<String>, ValueAndTimestamp<Integer>> result
+ = supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
+ assertEquals(
+ ValueAndTimestamp.make(1, 10),
+ result.get(new Windowed<>("1", new SessionWindow(10L, 10L))));
+ assertEquals(
+ ValueAndTimestamp.make(2, 11L),
+ result.get(new Windowed<>("1", new SessionWindow(11L, 11L))));
+ assertEquals(
+ ValueAndTimestamp.make(1, 12L),
+ result.get(new Windowed<>("1", new SessionWindow(12L, 12L))));
+ }
+
private void doCountSessionWindows(final
MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =