This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3905d9 KAFKA-8613: New APIs for Controlling Grace Period for
Windowed Operations (#10926)
b3905d9 is described below
commit b3905d9f71d48a60f2a9ee38014582d7ec7bc3c2
Author: Israel Ekpo <[email protected]>
AuthorDate: Wed Jun 30 20:09:19 2021 -0400
KAFKA-8613: New APIs for Controlling Grace Period for Windowed Operations
(#10926)
Implements KIP-633.
Grace-period is an important parameter and its best to make it the user's
responsibility to set it expliclity. Thus, we move off to provide a default and
make it a mandatory parameter when creating a window.
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Luke Chen
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../examples/pageview/PageViewTypedDemo.java | 4 +-
.../examples/pageview/PageViewUntypedDemo.java | 4 +-
.../examples/temperature/TemperatureDemo.java | 4 +-
.../apache/kafka/streams/kstream/JoinWindows.java | 45 ++++++++++++---
.../kafka/streams/kstream/SessionWindows.java | 67 ++++++++++++++++++----
.../kafka/streams/kstream/SlidingWindows.java | 53 +++++++++++++++--
.../apache/kafka/streams/kstream/TimeWindows.java | 66 +++++++++++++++++++--
.../org/apache/kafka/streams/kstream/Windows.java | 14 ++++-
.../org/apache/kafka/streams/TopologyTest.java | 1 +
.../integration/AbstractResetIntegrationTest.java | 1 +
.../integration/InternalTopicIntegrationTest.java | 1 +
.../integration/JoinStoreIntegrationTest.java | 1 +
.../KStreamAggregationDedupIntegrationTest.java | 1 +
.../KStreamAggregationIntegrationTest.java | 16 +++++-
.../KStreamRepartitionIntegrationTest.java | 1 +
.../integration/MetricsIntegrationTest.java | 1 +
.../integration/QueryableStateIntegrationTest.java | 1 +
.../integration/RocksDBMetricsIntegrationTest.java | 1 +
.../kafka/streams/kstream/JoinWindowsTest.java | 25 ++++++++
.../kstream/RepartitionTopicNamingTest.java | 1 +
.../kafka/streams/kstream/SessionWindowsTest.java | 23 ++++++++
.../kafka/streams/kstream/SlidingWindowsTest.java | 14 +++++
.../kafka/streams/kstream/TimeWindowsTest.java | 21 +++++++
.../kstream/internals/KGroupedStreamImplTest.java | 1 -
.../streams/kstream/internals/KStreamImplTest.java | 24 ++++++++
.../kstream/internals/KStreamKStreamJoinTest.java | 1 +
.../internals/KStreamKStreamLeftJoinTest.java | 2 +
.../internals/KStreamKStreamOuterJoinTest.java | 1 +
.../kstream/internals/KStreamRepartitionTest.java | 1 +
...KStreamSessionWindowAggregateProcessorTest.java | 1 +
.../KStreamSlidingWindowAggregateTest.java | 6 +-
.../internals/KStreamWindowAggregateTest.java | 1 +
.../SessionWindowedCogroupedKStreamImplTest.java | 2 +-
.../SlidingWindowedCogroupedKStreamImplTest.java | 1 +
.../kstream/internals/SuppressScenarioTest.java | 1 +
.../kstream/internals/SuppressTopologyTest.java | 1 +
.../streams/kstream/internals/TimeWindowTest.java | 1 +
.../TimeWindowedCogroupedKStreamImplTest.java | 1 +
.../kstream/internals/graph/StreamsGraphTest.java | 1 +
.../internals/RepartitionOptimizingTest.java | 1 +
.../internals/StreamsPartitionAssignorTest.java | 1 +
.../kafka/streams/tests/SmokeTestClient.java | 1 +
.../kafka/streams/tests/StreamsOptimizedTest.java | 1 +
.../kafka/test/GenericInMemoryKeyValueStore.java | 1 +
.../GenericInMemoryTimestampedKeyValueStore.java | 1 +
.../kafka/streams/scala/kstream/KStream.scala | 1 +
.../apache/kafka/streams/scala/TopologyTest.scala | 27 +++++----
.../kafka/streams/scala/kstream/KStreamTest.scala | 28 ++-------
.../kafka/streams/scala/kstream/KTableTest.scala | 9 +--
49 files changed, 404 insertions(+), 79 deletions(-)
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 1b467eb..a5086de 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -191,6 +191,8 @@ public class PageViewTypedDemo {
final KTable<String, UserProfile> users =
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new
JSONSerde<>()));
+ final Duration duration24Hours = Duration.ofHours(24);
+
final KStream<WindowedPageViewByRegion, RegionCount> regionCount =
views
.leftJoin(users, (view, profile) -> {
final PageViewByRegion viewByRegion = new PageViewByRegion();
@@ -206,7 +208,7 @@ public class PageViewTypedDemo {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region,
viewRegion))
.groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
-
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7),
duration24Hours).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 2a9972b..cdb3639 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -78,6 +78,8 @@ public class PageViewUntypedDemo {
final KTable<String, String> userRegions = users.mapValues(record ->
record.get("region").textValue());
+ final Duration duration24Hours = Duration.ofHours(24);
+
final KStream<JsonNode, JsonNode> regionCount = views
.leftJoin(userRegions, (view, region) -> {
final ObjectNode jNode = JsonNodeFactory.instance.objectNode();
@@ -88,7 +90,7 @@ public class PageViewUntypedDemo {
})
.map((user, viewRegion) -> new
KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
-
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7),
duration24Hours).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 4d63d30..6384466 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -79,6 +79,8 @@ public class TemperatureDemo {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ final Duration duration24Hours = Duration.ofHours(24);
+
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source =
builder.stream("iot-temperature");
@@ -88,7 +90,7 @@ public class TemperatureDemo {
// to group and reduce them, a key is needed ("temp" has been
chosen)
.selectKey((key, value) -> "temp")
.groupByKey()
-
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
+
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE),
duration24Hours))
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2)) {
return value1;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 2641286..84e3f7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -79,10 +79,7 @@ public class JoinWindows extends Windows<Window> {
protected final boolean enableSpuriousResultFix;
protected JoinWindows(final JoinWindows joinWindows) {
- beforeMs = joinWindows.beforeMs;
- afterMs = joinWindows.afterMs;
- graceMs = joinWindows.graceMs;
- enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+ this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs,
joinWindows.enableSpuriousResultFix);
}
private JoinWindows(final long beforeMs,
@@ -92,32 +89,62 @@ public class JoinWindows extends Windows<Window> {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
+ * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition
and grace period
+ */
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
implicitly sets the grace period to zero
+ * which means that out of order records arriving after the window end
will be dropped.
+ *
+ * @param timeDifference join window interval
+ * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out of order records arriving after the
window end will be dropped
+ */
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) {
- return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), 0L, true);
+ return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
}
- /**
+ /**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
- * @param timeDifference join window interval
+ * @param timeDifference
+ * @return a new JoinWindows object with the window definition with and
grace period (uses old default of 24 hours)
* @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration,
Duration)} instead
*/
+ @Deprecated
public static JoinWindows of(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefix);
- return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
DEFAULT_GRACE_PERIOD_MS, false);
+ return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
DEPRECATED_OLD_24_HR_GRACE_PERIOD, false);
}
/**
@@ -177,7 +204,9 @@ public class JoinWindows extends Windows<Window> {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} instead
*/
+ @Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index f41dd67..65bcfd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,8 +23,9 @@ import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static org.apache.kafka.streams.kstream.Windows.DEFAULT_GRACE_PERIOD_MS;
-
+import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
+import static java.time.Duration.ofMillis;
/**
* A session based window specification used for aggregating events into
sessions.
@@ -78,23 +79,68 @@ public final class SessionWindows {
private SessionWindows(final long gapMs, final long graceMs) {
this.gapMs = gapMs;
this.graceMs = graceMs;
+
+ if (gapMs <= 0) {
+ throw new IllegalArgumentException("Gap time cannot be zero or
negative.");
+ }
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
}
/**
- * Create a new window specification with the specified inactivity gap.
+ * Creates a new window specification with the specified inactivity gap.
+ * Using the method implicitly sets the grace period to zero which
+ * means that out of order records arriving after the window end will be
dropped
+ *
+ * <p>
+ * Note that new events may change the boundaries of session windows, so
aggressive
+ * close times can lead to surprising results in which an out-of-order
event is rejected and then
+ * a subsequent event moves the window boundary forward.
*
* @param inactivityGap the gap of inactivity between sessions
- * @return a new window specification with default maintain duration of 1
day
+ * @return a window definition with the window size and no grace period.
Note that this means out of order records arriving after the window end will be
dropped
+ * @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
+ */
+ public static SessionWindows ofInactivityGapWithNoGrace(final Duration
inactivityGap) {
+ return ofInactivityGapAndGrace(inactivityGap,
ofMillis(NO_GRACE_PERIOD));
+ }
+
+ /**
+ * Creates a new window specification with the specified inactivity gap.
+ * Using the method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd} which
+ * means that out of order records arriving after the window end will be
dropped
*
+ * <p>
+ * Note that new events may change the boundaries of session windows, so
aggressive
+ * close times can lead to surprising results in which an out-of-order
event is rejected and then
+ * a subsequent event moves the window boundary forward.
+ *
+ * @param inactivityGap the gap of inactivity between sessions
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @return A SessionWindows object with the specified inactivity gap and
grace period
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ */
+ public static SessionWindows ofInactivityGapAndGrace(final Duration
inactivityGap, final Duration afterWindowEnd) {
+ return new SessionWindows(inactivityGap.toMillis(),
afterWindowEnd.toMillis());
+ }
+
+
+ /**
+ * Create a new window specification with the specified inactivity gap.
+ *
+ * @param inactivityGap the gap of inactivity between sessions
+ * @return a new window specification without specifying a grace period
(uses old default of 24 hours)
* @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)}
instead
*/
+ @Deprecated
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs =
validateMillisecondDuration(inactivityGap, msgPrefix);
- if (inactivityGapMs <= 0) {
- throw new IllegalArgumentException("Gap time (inactivityGapMs)
cannot be zero or negative.");
- }
- return new SessionWindows(inactivityGapMs, DEFAULT_GRACE_PERIOD_MS);
+
+ return new SessionWindows(inactivityGapMs,
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
}
/**
@@ -108,13 +154,12 @@ public final class SessionWindows {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration,
Duration)} instead
*/
+ @Deprecated
public SessionWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEndMs < 0) {
- throw new IllegalArgumentException("Grace period must not be
negative.");
- }
return new SessionWindows(gapMs, afterWindowEndMs);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 189770f..2cbda6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -17,10 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
+
import java.time.Duration;
import java.util.Objects;
+
+import static java.time.Duration.ofMillis;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
/**
* A sliding window used for aggregating events.
@@ -78,6 +82,45 @@ public final class SlidingWindows {
private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
this.timeDifferenceMs = timeDifferenceMs;
this.graceMs = graceMs;
+
+ if (timeDifferenceMs < 0) {
+ throw new IllegalArgumentException("Window time difference must
not be negative.");
+ }
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Window grace period must not
be negative.");
+ }
+ }
+
+ /**
+ * Return a window definition with the window size
+ * Using the method implicitly sets the grace period to zero which means
that
+ * out of order records arriving after the window end will be dropped
+ *
+ * @param timeDifference the max time difference (inclusive) between two
records in a window
+ * @return a new window definition with no grace period. Note that this
means out of order records arriving after the window end will be dropped
+ * @throws IllegalArgumentException if the timeDifference is negative
+ */
+ public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) throws IllegalArgumentException {
+ return ofTimeDifferenceAndGrace(timeDifference,
ofMillis(NO_GRACE_PERIOD));
+ }
+
+ /**
+ * Return a window definition with the window size based on the given
maximum time difference (inclusive) between
+ * records in the same window and given window grace period. Reject
out-of-order events that arrive after {@code afterWindowEnd}.
+ * A window is closed when {@code stream-time > window-end + grace-period}.
+ *
+ * @param timeDifference the max time difference (inclusive) between two
records in a window
+ * @param afterWindowEnd the grace period to admit out-of-order events to
a window
+ * @return a new window definition with the specified grace period
+ * @throws IllegalArgumentException if the timeDifference or
afterWindowEnd (grace period) is negative
+ */
+ public static SlidingWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
+
+ final long timeDifferenceMs = timeDifference.toMillis();
+ final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+ return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
}
/**
@@ -89,18 +132,16 @@ public final class SlidingWindows {
* @param grace the grace period to admit out-of-order events to a window
* @return a new window definition
* @throws IllegalArgumentException if the specified window size is < 0
or grace < 0, or either can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} or {@link
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
+ @Deprecated
public static SlidingWindows withTimeDifferenceAndGrace(final Duration
timeDifference, final Duration grace) throws IllegalArgumentException {
final String msgPrefixSize =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefixSize);
- if (timeDifferenceMs < 0) {
- throw new IllegalArgumentException("Window time difference must
not be negative.");
- }
+
final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace,
"grace");
final long graceMs = validateMillisecondDuration(grace,
msgPrefixGrace);
- if (graceMs < 0) {
- throw new IllegalArgumentException("Window grace period must not
be negative.");
- }
+
return new SlidingWindows(timeDifferenceMs, graceMs);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 2149556..7970085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import static java.time.Duration.ofMillis;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -72,6 +73,14 @@ public final class TimeWindows extends Windows<TimeWindow> {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
this.graceMs = graceMs;
+
+ if (sizeMs <= 0) {
+ throw new IllegalArgumentException("Window size (sizeMs) must be
larger than zero.");
+ }
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
}
/**
@@ -81,18 +90,63 @@ public final class TimeWindows extends Windows<TimeWindow> {
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code
advance == size}.
+ * Using the method implicitly sets the grace period to zero which means
+ * that out of order records arriving after the window end will be dropped
*
* @param size The size of the window
- * @return a new window definition with default maintain duration of 1 day
+ * @return a new window definition with default no grace period. Note that
this means out of order records arriving after the window end will be dropped
+ * @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
+ */
+ public static TimeWindows ofSizeWithNoGrace(final Duration size) throws
IllegalArgumentException {
+ return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+ }
+
+ /**
+ * Return a window definition with the given window size, and with the
advance interval being equal to the window
+ * size.
+ * The time interval represented by the N-th window is: {@code [N * size,
N * size + size)}.
+ * <p>
+ * This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
+ * Tumbling windows are a special case of hopping windows with {@code
advance == size}.
+ * Using the method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd} which means
+ * that out of order records arriving after the window end will be dropped.
+ *
+ * <p>
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param size The size of the window. Must be larger than zero
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window. Must be non-negative.
+ * @return a TimeWindows object with the specified size and the specified
grace period
+ * @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
+ */
+ public static TimeWindows ofSizeAndGrace(final Duration size, final
Duration afterWindowEnd)
+ throws IllegalArgumentException {
+
+ final long sizeMs = size.toMillis();
+ final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+ return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+ }
+
+ /**
+ * Return a window definition with the given window size, and with the
advance interval being equal to the window
+ * size.
+ * The time interval represented by the N-th window is: {@code [N * size,
N * size + size)}.
+ * <p>
+ * This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
+ * Tumbling windows are a special case of hopping windows with {@code
advance == size}.
+ *
+ * @param size The size of the window
+ * @return a new window definition without specifying the grace period
(uses old default of 24 hours)
* @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
*/
+ @Deprecated
public static TimeWindows of(final Duration size) throws
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
final long sizeMs = validateMillisecondDuration(size, msgPrefix);
- if (sizeMs <= 0) {
- throw new IllegalArgumentException("Window size (sizeMs) must be
larger than zero.");
- }
- return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+ return new TimeWindows(sizeMs, sizeMs,
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
}
/**
@@ -142,7 +196,9 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)}
instead
*/
+ @Deprecated
public TimeWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index bece9e0..f0204d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -38,9 +38,17 @@ import java.util.Map;
*/
public abstract class Windows<W extends Window> {
- // By default grace period is 24 hours for all windows,
- // in other words we allow out-of-order data for up to a day
- protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+ /**
+ * By default grace period is 24 hours for all windows in other words we
allow out-of-order data for up to a day
+ * This behavior is now deprecated and additional details are available in
the motivation for the KIP
+ * Check out <a
href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a> for more details
+ */
+ protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 *
60 * 1000L;
+
+ /**
+ * This constant is used as the specified grace period where we do not
have any grace periods instead of magic constants
+ */
+ protected static final long NO_GRACE_PERIOD = 0L;
protected Windows() {}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 048a55b..9a178d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -62,6 +62,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class TopologyTest {
private final StoreBuilder<MockKeyValueStore> storeBuilder =
EasyMock.createNiceMock(StoreBuilder.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index c2b39b5..fd5da12 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -341,6 +341,7 @@ public abstract class AbstractResetIntegrationTest {
}
}
+ @SuppressWarnings("deprecation")
private Topology setupTopologyWithIntermediateTopic(final boolean
useRepartitioned,
final String
outputTopic2) {
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index eb241c5..504d9f6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -71,6 +71,7 @@ import static org.junit.Assert.assertTrue;
/**
* Tests related to internal topics in streams
*/
+@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index f5ed891..59d8603 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -49,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
+@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class JoinStoreIntegrationTest {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 8eba6de..4fe35a6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -65,6 +65,7 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
* by virtue of having a large commit interval
*/
@Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index b7b1f4e..1b92ab5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -98,7 +98,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "deprecation"})
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@@ -209,6 +209,7 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldReduceWindowed() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -219,6 +220,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
+ //noinspection deprecation
groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.reduce(reducer)
@@ -318,6 +320,7 @@ public class KStreamAggregationIntegrationTest {
)));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldAggregateWindowed() throws Exception {
final long firstTimestamp = mockTime.milliseconds();
@@ -328,6 +331,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
+ //noinspection deprecation
groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
.aggregate(
initializer,
@@ -442,12 +446,14 @@ public class KStreamAggregationIntegrationTest {
shouldCountHelper();
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldGroupByKey() throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
+ //noinspection deprecation
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
@@ -476,6 +482,7 @@ public class KStreamAggregationIntegrationTest {
)));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldReduceSlidingWindows() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -487,6 +494,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
+ //noinspection deprecation
groupedStream
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(2000L)))
.reduce(reducer)
@@ -580,6 +588,7 @@ public class KStreamAggregationIntegrationTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldAggregateSlidingWindows() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -591,6 +600,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
+ //noinspection deprecation
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMinutes(5)))
.aggregate(
initializer,
@@ -689,6 +699,7 @@ public class KStreamAggregationIntegrationTest {
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
@@ -761,6 +772,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
+ //noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
@@ -797,6 +809,7 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair(1L, t3)));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
@@ -869,6 +882,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<String, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
+ //noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 4ae4fdb..c2dee61 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -81,6 +81,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
@Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
public class KStreamRepartitionIntegrationTest {
private static final int NUM_BROKERS = 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b8ee31b..84f5cfc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
private static final int NUM_BROKERS = 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 4e9b2b5..d07648b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -120,6 +120,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
private static final Logger log =
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 648cfda..c698d06 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -69,6 +69,7 @@ import static org.hamcrest.Matchers.notNullValue;
@Category({IntegrationTest.class})
@RunWith(Parameterized.class)
+@SuppressWarnings("deprecation")
public class RocksDBMetricsIntegrationTest {
private static final int NUM_BROKERS = 3;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index cf9b6d6c..ca1512f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -28,11 +28,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class JoinWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_OTHER_SIZE = 456L; // should be larger than
anySize
+ private static final long ANY_GRACE = 1024L;
+ @SuppressWarnings("deprecation")
@Test
public void validWindows() {
JoinWindows.of(ofMillis(ANY_OTHER_SIZE)) // [ -anyOtherSize ;
anyOtherSize ]
@@ -69,6 +72,8 @@ public class JoinWindowsTest {
@Test
public void timeDifferenceMustNotBeNegative() {
assertThrows(IllegalArgumentException.class, () ->
JoinWindows.of(ofMillis(-1)));
+ assertThrows(IllegalArgumentException.class, () ->
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1)));
+ assertThrows(IllegalArgumentException.class, () ->
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE)));
}
@Test
@@ -133,6 +138,16 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
);
+
+ verifyEquality(
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
+ );
+
+ verifyEquality(
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)),
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))
+ );
}
@Test
@@ -162,5 +177,15 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
+
+ verifyInEquality(
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
+ );
+
+ verifyInEquality(
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)),
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9))
+ );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 5545fb6..cad978c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -38,6 +38,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class RepartitionTopicNamingTest {
private final KeyValueMapper<String, String, String> kvMapper = (k, v) ->
k + v;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 14104d69..f38be3c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -25,12 +25,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class SessionWindowsTest {
@Test
public void shouldSetWindowGap() {
final long anyGap = 42L;
+ final long anyGrace = 1024L;
+
assertEquals(anyGap,
SessionWindows.with(ofMillis(anyGap)).inactivityGap());
+ assertEquals(anyGap,
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(anyGap)).inactivityGap());
+ assertEquals(anyGap,
SessionWindows.ofInactivityGapAndGrace(ofMillis(anyGap),
ofMillis(anyGrace)).inactivityGap());
}
@Test
@@ -66,6 +71,15 @@ public class SessionWindowsTest {
public void equalsAndHashcodeShouldBeValidForPositiveCases() {
verifyEquality(SessionWindows.with(ofMillis(1)),
SessionWindows.with(ofMillis(1)));
+ verifyEquality(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)),
+ SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1))
+ );
+
+ verifyEquality(
+ SessionWindows.ofInactivityGapAndGrace(ofMillis(1),
ofMillis(11)),
+ SessionWindows.ofInactivityGapAndGrace(ofMillis(1),
ofMillis(11))
+ );
+
verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)),
SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)),
SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
@@ -75,6 +89,15 @@ public class SessionWindowsTest {
@Test
public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+
+ verifyInEquality(
+ SessionWindows.ofInactivityGapWithNoGrace(ofMillis(9)),
+ SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)));
+
+ verifyInEquality(
+ SessionWindows.ofInactivityGapAndGrace(ofMillis(9),
ofMillis(9)),
+ SessionWindows.ofInactivityGapAndGrace(ofMillis(1),
ofMillis(9)));
+
verifyInEquality(SessionWindows.with(ofMillis(9)),
SessionWindows.with(ofMillis(1)));
verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)),
SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
index f6c63a3..dd06984 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -24,13 +24,17 @@ import static
org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+@SuppressWarnings("deprecation")
public class SlidingWindowsTest {
private static final long ANY_SIZE = 123L;
+ private static final long ANY_GRACE = 1024L;
@Test
public void shouldSetTimeDifference() {
assertEquals(ANY_SIZE,
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE),
ofMillis(3)).timeDifferenceMs());
+ assertEquals(ANY_SIZE,
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE),
ofMillis(ANY_GRACE)).timeDifferenceMs());
+ assertEquals(ANY_SIZE,
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs());
}
@Test
@@ -56,6 +60,16 @@ public class SlidingWindowsTest {
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(grace)),
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(grace))
);
+
+ verifyEquality(
+
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(grace)),
+
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(grace))
+ );
+
+ verifyEquality(
+
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference)),
+
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference))
+ );
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 765bad5..25a607d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -29,13 +29,17 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class TimeWindowsTest {
private static final long ANY_SIZE = 123L;
+ private static final long ANY_GRACE = 1024L;
@Test
public void shouldSetWindowSize() {
assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);
+ assertEquals(ANY_SIZE,
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs);
+ assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE),
ofMillis(ANY_GRACE)).sizeMs);
}
@Test
@@ -140,10 +144,27 @@ public class TimeWindowsTest {
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)),
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4))
);
+
+ verifyEquality(TimeWindows.ofSizeWithNoGrace(ofMillis(3)),
TimeWindows.ofSizeWithNoGrace(ofMillis(3)));
+
+ verifyEquality(TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33)),
+ TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33))
+ );
}
@Test
public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+
+ verifyInEquality(
+ TimeWindows.ofSizeWithNoGrace(ofMillis(9)),
+ TimeWindows.ofSizeWithNoGrace(ofMillis(3))
+ );
+
+ verifyInEquality(
+ TimeWindows.ofSizeAndGrace(ofMillis(9), ofMillis(9)),
+ TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(9))
+ );
+
verifyInEquality(TimeWindows.of(ofMillis(9)),
TimeWindows.of(ofMillis(3)));
verifyInEquality(TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)),
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b710e24..eba39a7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
-
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KGroupedStreamImplTest {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 8464ab9..9753453 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -100,6 +100,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamImplTest {
@@ -704,6 +705,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("materialized can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnJoin() {
final NullPointerException exception = assertThrows(
@@ -712,6 +714,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -724,6 +727,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnJoin() {
final NullPointerException exception = assertThrows(
@@ -732,6 +736,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnJoin() {
final NullPointerException exception = assertThrows(
@@ -740,6 +745,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -752,6 +758,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -784,6 +791,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnJoin() {
final NullPointerException exception = assertThrows(
@@ -796,6 +804,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("streamJoined can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnLeftJoin() {
final NullPointerException exception = assertThrows(
@@ -804,6 +813,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -816,6 +826,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnLeftJoin() {
final NullPointerException exception = assertThrows(
@@ -824,6 +835,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoin() {
final NullPointerException exception = assertThrows(
@@ -832,6 +844,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -844,6 +857,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void
shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -877,6 +891,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnLeftJoin() {
final NullPointerException exception = assertThrows(
@@ -889,6 +904,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("streamJoined can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnOuterJoin() {
final NullPointerException exception = assertThrows(
@@ -897,6 +913,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -909,6 +926,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be
null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnOuterJoin() {
final NullPointerException exception = assertThrows(
@@ -917,6 +935,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoin() {
final NullPointerException exception = assertThrows(
@@ -925,6 +944,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -937,6 +957,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void
shouldNotAllowNullValueJoinerWithKeyOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@@ -969,6 +990,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnOuterJoin() {
final NullPointerException exception = assertThrows(
@@ -1511,6 +1533,7 @@ public class KStreamImplTest {
assertThat(mockProcessors.get(1).processed(),
equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
}
+ @SuppressWarnings("deprecation")
@Test
public void
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention()
{
final StreamsBuilder builder = new StreamsBuilder();
@@ -1538,6 +1561,7 @@ public class KStreamImplTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
{
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 1757584..1d50a37 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -63,6 +63,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamJoinTest {
private final String topic1 = "topic1";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index eb705f1..bc20312 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -53,6 +53,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamLeftJoinTest {
private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -97,6 +98,7 @@ public class KStreamKStreamLeftJoinTest {
false
);
}
+
@Test
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
runLeftJoinWithoutSpuriousResultFix(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index b4c0827..39ed039 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
index 04bbda8..0344f46 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
+@SuppressWarnings("deprecation")
@RunWith(EasyMockRunner.class)
public class KStreamRepartitionTest {
private final String inputTopic = "input-topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 3022a36..f4ebfdd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -69,6 +69,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamSessionWindowAggregateProcessorTest {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index cf6efec..798159d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -711,7 +711,7 @@ public class KStreamSlidingWindowAggregateTest {
builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(100)))
+
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10),
ofMillis(100)))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.toStringInstance("+"), Materialized.<String, String,
WindowStore<Bytes,
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG,
builtInMetricsVersion);
@@ -743,7 +743,7 @@ public class KStreamSlidingWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic,
Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(90)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10),
ofMillis(90)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -807,7 +807,7 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(10000)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10),
ofMillis(10000)))
// The aggregator needs to sort the strings so the window value is
the same for the final windows even when
// records are processed in a different order. Here, we sort
alphabetically.
.aggregate(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 39a7444..b7759bb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@SuppressWarnings("deprecation")
public class KStreamWindowAggregateTest {
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index 2946417..eee7cc5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -50,7 +50,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
-
+@SuppressWarnings("deprecation")
public class SessionWindowedCogroupedKStreamImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 96d301d..52ff858 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index e0b7957..7b521ab 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -70,6 +70,7 @@ import static
org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+@SuppressWarnings("deprecation")
public class SuppressScenarioTest {
private static final StringDeserializer STRING_DESERIALIZER = new
StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new
StringSerializer();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
index d775c89..d775796 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
@@ -39,6 +39,7 @@ import static
org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+@SuppressWarnings("deprecation")
public class SuppressTopologyTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index f905d32..c558eab 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+@SuppressWarnings("deprecation")
public class TimeWindowTest {
private long start = 50;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
index d052429..cd9ca19 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class TimeWindowedCogroupedKStreamImplTest {
private static final Long WINDOW_SIZE = 500L;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 8926a1e..b912a96 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -53,6 +53,7 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
+@SuppressWarnings("deprecation")
public class StreamsGraphTest {
private final Pattern repartitionTopicPattern = Pattern.compile("Sink:
.*-repartition");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 3b7eb78..d1eb5b5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+@SuppressWarnings("deprecation")
public class RepartitionOptimizingTest {
private final Logger log =
LoggerFactory.getLogger(RepartitionOptimizingTest.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 860ed73..7244bd6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -137,6 +137,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(value = Parameterized.class)
+@SuppressWarnings("deprecation")
public class StreamsPartitionAssignorTest {
private static final String CONSUMER_1 = "consumer1";
private static final String CONSUMER_2 = "consumer2";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 2bd4437..86f7583 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+@SuppressWarnings("deprecation")
public class SmokeTestClient extends SmokeTestUtil {
private final String name;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index da063c0..714aa11 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -47,6 +47,7 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
+@SuppressWarnings("deprecation")
public class StreamsOptimizedTest {
public static void main(final String[] args) throws Exception {
diff --git
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index d9a2afe..7c3af25 100644
---
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
return this.name;
}
+ @SuppressWarnings("deprecation")
@Deprecated
@Override
/* This is a "dummy" store used for testing;
diff --git
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
index 2198d18..114ea06 100644
---
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
* This class is a generic version of the in-memory key-value store that is
useful for testing when you
* need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
*/
+@SuppressWarnings("deprecation")
public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
implements TimestampedKeyValueStore<K, V> {
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d3f83e0..d097b85 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -54,6 +54,7 @@ import scala.jdk.CollectionConverters._
* @param inner The underlying Java abstraction for KStream
* @see `org.apache.kafka.streams.kstream.KStream`
*/
+//noinspection ScalaDeprecation
class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 9653ddb..92ca5bd 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -51,6 +51,7 @@ import scala.jdk.CollectionConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs
match.
*/
+//noinspection ScalaDeprecation
class TopologyTest {
private val inputTopic = "input-topic"
@@ -377,14 +378,16 @@ class TopologyTest {
mappedStream
.filter((k: String, _: String) => k == "A")
- .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
JoinWindows.of(Duration.ofMillis(5000)))(
+ .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
+
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
mappedStream
.filter((k: String, _: String) => k == "A")
- .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
JoinWindows.of(Duration.ofMillis(5000)))(
+ .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
+
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@@ -433,18 +436,22 @@ class TopologyTest {
mappedStream
.filter((key, _) => key == "A")
- .join[Integer, String](stream2,
- valueJoiner2,
- JoinWindows.of(Duration.ofMillis(5000)),
- StreamJoinedJ.`with`(NewSerdes.stringSerde,
NewSerdes.stringSerde, SerdesJ.Integer))
+ .join[Integer, String](
+ stream2,
+ valueJoiner2,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)),
+ StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
SerdesJ.Integer)
+ )
.to(JOINED_TOPIC)
mappedStream
.filter((key, _) => key == "A")
- .join(stream3,
- valueJoiner3,
- JoinWindows.of(Duration.ofMillis(5000)),
- StreamJoinedJ.`with`(NewSerdes.stringSerde,
NewSerdes.stringSerde, SerdesJ.String))
+ .join(
+ stream3,
+ valueJoiner3,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
Duration.ofHours(24)),
+ StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde,
SerdesJ.String)
+ )
.to(JOINED_TOPIC)
builder
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 1d8a1f1..0ec7b0e 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -17,9 +17,7 @@
package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
-import java.time.Instant
-import java.util.regex.Pattern
-
+import java.time.{Duration, Instant}
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
JoinWindows,
@@ -192,6 +190,7 @@ class KStreamTest extends TestDriver {
testDriver.close()
}
+ //noinspection ScalaDeprecation
@Test
def testJoinCorrectlyRecords(): Unit = {
val builder = new StreamsBuilder()
@@ -201,7 +200,9 @@ class KStreamTest extends TestDriver {
val stream1 = builder.stream[String, String](sourceTopic1)
val stream2 = builder.stream[String, String](sourceTopic2)
- stream1.join(stream2)((a, b) => s"$a-$b",
JoinWindows.of(ofSeconds(1))).to(sinkTopic)
+ stream1
+ .join(stream2)((a, b) => s"$a-$b",
JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24)))
+ .to(sinkTopic)
val now = Instant.now()
@@ -464,23 +465,4 @@ class KStreamTest extends TestDriver {
val transformNode =
builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
assertEquals("my-name", transformNode.name())
}
-
- @Test
- def testSettingNameOnStream(): Unit = {
- val builder = new StreamsBuilder()
- val topicsPattern = "t-[A-Za-z0-9-].suffix"
- val sinkTopic = "sink"
-
- builder
- .stream[String, String](Pattern.compile(topicsPattern))(
- Consumed.`with`[String, String].withName("my-fancy-name")
- )
- .to(sinkTopic)
-
- import scala.jdk.CollectionConverters._
-
- val streamNode =
builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
- assertEquals("my-fancy-name", streamNode.name())
- }
-
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 15e090d..09a3a7d 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -36,6 +36,7 @@ import java.time.Duration.ofMillis
import scala.jdk.CollectionConverters._
+//noinspection ScalaDeprecation
class KTableTest extends TestDriver {
@Test
@@ -166,7 +167,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
- val window = TimeWindows.of(Duration.ofSeconds(1L))
+ val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L),
Duration.ofHours(24))
val suppression =
JSuppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L),
BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@@ -224,7 +225,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
- val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L),
ofMillis(1000L))
+ val window = SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(1000L),
ofMillis(1000L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@@ -262,7 +263,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
- val window =
TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
+ val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L),
Duration.ofSeconds(1L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@@ -321,7 +322,7 @@ class KTableTest extends TestDriver {
val sourceTopic = "source"
val sinkTopic = "sink"
// Very similar to
SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
- val window =
SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
+ val window = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(5L),
Duration.ofMillis(10L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder