This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fbc6b7 KAFKA-13021: clarify KIP-633 javadocs and address remaining
feedback (#11114)
7fbc6b7 is described below
commit 7fbc6b73aa5d194948b30a00074cd0f79365694d
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Jul 23 16:14:37 2021 -0700
KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback
(#11114)
There were a few followup things to address from #10926, most importantly a
number of updates to the javadocs. Also includes a few missing verification
checks.
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>, Israel Ekpo
---
docs/streams/upgrade-guide.html | 33 ++++++++------
.../apache/kafka/streams/kstream/JoinWindows.java | 50 +++++++++++++---------
.../kafka/streams/kstream/SessionWindows.java | 39 ++++++++++-------
.../kafka/streams/kstream/SlidingWindows.java | 24 ++++++-----
.../apache/kafka/streams/kstream/TimeWindows.java | 43 ++++++++++---------
.../org/apache/kafka/streams/kstream/Windows.java | 2 +-
.../kafka/streams/kstream/JoinWindowsTest.java | 10 ++---
.../kafka/streams/kstream/SessionWindowsTest.java | 10 ++---
.../kafka/streams/kstream/TimeWindowsTest.java | 10 ++---
9 files changed, 126 insertions(+), 95 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 448de93..eb3b125 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -135,25 +135,32 @@
We removed the default implementation of
<code>RocksDBConfigSetter#close()</code>.
</p>
<p>
- We dropped the default 24 hours grace period for windowed operations
such as Window or Session aggregates, or stream-stream joins.
- This period determines how long after a window ends any out-of-order
records will still be processed.
- Records coming in after the grace period has elapsed will be dropped
from those windows.
- With a long grace period, though Kafka Streams can handle out-of-order
data up to that amount of time, it will also incur a high and confusing latency
for users,
- e.g. suppression operators with the default won't emit results up for
24 hours, while in practice out-of-order data usually has a much smaller
time-skewness.
- Instead of abstracting this config from users with a long default
value, we introduced new constructs such as
<code>TimeWindows#ofSizeAndGrace</code> to let callers always set it upon
constructing the windows;
- the other setters such as <code>TimeWindows#grace</code> are
deprecated and will be removed in the future.
+ We dropped the default 24 hours grace period for windowed operations
such as Window or Session aggregates, or
+ stream-stream joins. This period determines how long after a window
ends any out-of-order records will still
+ be processed. Records coming in after the grace period has elapsed are
considered late and will be dropped.
+ But in operators such as suppression, a large grace period has the
drawback of incurring an equally large
+ output latency. The current API made it all too easy to miss the grace
period config completely, leading you
+ to wonder why your application seems to produce no output -- it
actually is, but not for 24 hours.
+ <p>
+ To prevent accidentally or unknowingly falling back to the default
24hr grace period, we deprecated all of the
+ existing static constructors for the <code>Windows</code> classes
(such as <code>TimeWindows#of</code>). These
+ are replaced by new static constructors of two flavors:
<code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
+ (these are for the <code>TimeWindows</code> class; analogous APIs
exist for the <code>JoinWindows</code>,
+ <code>SessionWindows</code>, and SlidingWindows classes). With these
new APIs you are forced to set the grace
+ period explicitly, or else consciously choose to opt out by selecting
the <code>WithNoGrace</code> flavor which
+ sets it to 0 for situations where you really don't care about the
grace period, for example during testing or
+ when playing around with Kafka Streams for the first time. Note that
using the new APIs for the
+ <code>JoinWindows</code> class will also enable a fix for spurious
left/outer join results, as described in
+ the following paragraph. For more details on the grace period and new
static constructors, see
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>
</p>
<p>
Additionally, in older versions Kafka Streams emitted stream-stream
left/outer join results eagerly. This behavior may lead to spurious left/outer
join result records.
In this release, we changed the behavior to avoid spurious results and
left/outer join result are only emitted after the join window is closed, i.e.,
after the grace period elapsed.
To maintain backward compatibility, the old API
<code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit
behavior and only the new
- APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and
<code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
+ APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and
<code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
Check out
+ <a
href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a> for
more information.
</p>
- <ul>
- <li><a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>:
Drop 24 hour default of grace period in Streams</li>
- <li><a
href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a>: Avoid
spurious left/outer join results in stream-stream join</li>
- </ul>
-
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields
on TaskId have been deprecated and replaced with getters. Please migrate to
using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and
<code>TaskId.partition()</code> APIs instead. Also, the
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been
deprecated
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 9a7c7b5..1b26bcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -102,49 +102,61 @@ public class JoinWindows extends Windows<Window> {
/**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
- * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
- * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
- * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
+ * the timestamp of the record from the primary stream.
+ * <p>
+ * Using this method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd}, which
+ * means that only out-of-order records arriving more than the grace
period after the window end will be dropped.
+ * The window close, after which any incoming records are considered late
and will be rejected, is defined as
+ * {@code windowEnd + afterWindowEnd}
*
* @param timeDifference join window interval
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
* @return A new JoinWindows object with the specified window definition
and grace period
+ * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
- return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
+ final String timeDifferenceMsgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
+
+ final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
+
+ return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
afterWindowEndMs, true);
}
/**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
- * the timestamp of the record from the primary stream. Using the method
implicitly sets the grace period to zero
- * which means that out of order records arriving after the window end
will be dropped.
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
+ * the timestamp of the record from the primary stream.
+ * <p>
+ * CAUTION: Using this method implicitly sets the grace period to zero,
which means that any out-of-order
+ * records arriving after the window ends are considered late and will be
dropped.
*
* @param timeDifference join window interval
+ * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out-of-order records arriving after the
window end will be dropped
* @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
- * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out of order records arriving after the
window end will be dropped
*/
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) {
- return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
+ return ofTimeDifferenceAndGrace(timeDifference,
Duration.ofMillis(NO_GRACE_PERIOD));
}
/**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
- * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} before or after
* the timestamp of the record from the primary stream.
*
- * @param timeDifference
+ * @param timeDifference join window interval
* @return a new JoinWindows object with the window definition with and
grace period (default to 24 hours minus {@code timeDifference})
* @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link
#ofTimeDifferenceWithNoGrace(Duration)}} instead
+ * @deprecated since 3.0. Use {@link
#ofTimeDifferenceWithNoGrace(Duration)}} instead
*/
@Deprecated
public static JoinWindows of(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefix);
- return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
+ return new JoinWindows(timeDifferenceMs, timeDifferenceMs,
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0),
false);
}
/**
@@ -203,16 +215,14 @@ public class JoinWindows extends Windows<Window> {
*
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration,
Duration)} instead
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration,
Duration)} instead
*/
@Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
+ //TODO KAFKA-13021: disallow calling grace() if it was already set via
ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEndMs < 0) {
- throw new IllegalArgumentException("Grace period must not be
negative.");
- }
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, false);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 6956905..85451c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,7 +23,7 @@ import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
import static java.time.Duration.ofMillis;
@@ -91,16 +91,16 @@ public final class SessionWindows {
/**
* Creates a new window specification with the specified inactivity gap.
- * Using the method implicitly sets the grace period to zero which
- * means that out of order records arriving after the window end will be
dropped
- *
* <p>
* Note that new events may change the boundaries of session windows, so
aggressive
* close times can lead to surprising results in which an out-of-order
event is rejected and then
* a subsequent event moves the window boundary forward.
+ * <p>
+ * CAUTION: Using this method implicitly sets the grace period to zero,
which means that any out-of-order
+ * records arriving after the window ends are considered late and will be
dropped.
*
* @param inactivityGap the gap of inactivity between sessions
- * @return a window definition with the window size and no grace period.
Note that this means out of order records arriving after the window end will be
dropped
+ * @return a window definition with the window size and no grace period.
Note that this means out-of-order records arriving after the window end will be
dropped
* @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapWithNoGrace(final Duration
inactivityGap) {
@@ -109,23 +109,31 @@ public final class SessionWindows {
/**
* Creates a new window specification with the specified inactivity gap.
- * Using the method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd} which
- * means that out of order records arriving after the window end will be
dropped
- *
* <p>
* Note that new events may change the boundaries of session windows, so
aggressive
* close times can lead to surprising results in which an out-of-order
event is rejected and then
* a subsequent event moves the window boundary forward.
+ * <p>
+ * Using this method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd}, which
+ * means that only out-of-order records arriving more than the grace
period after the window end will be dropped.
+ * The window close, after which any incoming records are considered late
and will be rejected, is defined as
+ * {@code windowEnd + afterWindowEnd}
*
* @param inactivityGap the gap of inactivity between sessions
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return A SessionWindows object with the specified inactivity gap and
grace period
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
+ * if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapAndGrace(final Duration
inactivityGap, final Duration afterWindowEnd) {
- return new SessionWindows(inactivityGap.toMillis(),
afterWindowEnd.toMillis());
- }
+ final String inactivityGapMsgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+ final long inactivityGapMs =
validateMillisecondDuration(inactivityGap, inactivityGapMsgPrefix);
+ final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
+
+ return new SessionWindows(inactivityGapMs, afterWindowEndMs);
+ }
/**
* Create a new window specification with the specified inactivity gap.
@@ -133,14 +141,14 @@ public final class SessionWindows {
* @param inactivityGap the gap of inactivity between sessions
* @return a new window specification without specifying a grace period
(default to 24 hours minus {@code inactivityGap})
* @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)}
instead
+ * @deprecated since 3.0. Use {@link
#ofInactivityGapWithNoGrace(Duration)} instead
*/
@Deprecated
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs =
validateMillisecondDuration(inactivityGap, msgPrefix);
- return new SessionWindows(inactivityGapMs,
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
+ return new SessionWindows(inactivityGapMs,
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
}
/**
@@ -153,11 +161,12 @@ public final class SessionWindows {
*
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration,
Duration)} instead
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration,
Duration)} instead
*/
@Deprecated
public SessionWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
+ //TODO KAFKA-13021: disallow calling grace() if it was already set via
ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 2cbda6d..159fea7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -93,13 +93,16 @@ public final class SlidingWindows {
}
/**
- * Return a window definition with the window size
- * Using the method implicitly sets the grace period to zero which means
that
- * out of order records arriving after the window end will be dropped
+ * Return a window definition with the window size based on the given
maximum time difference (inclusive) between
+ * records in the same window and given window grace period. Reject
out-of-order events that arrive after {@code grace}.
+ * A window is closed when {@code stream-time > window-end + grace-period}.
+ * <p>
+ * CAUTION: Using this method implicitly sets the grace period to zero,
which means that any out-of-order
+ * records arriving after the window ends are considered late and will be
dropped.
*
* @param timeDifference the max time difference (inclusive) between two
records in a window
- * @return a new window definition with no grace period. Note that this
means out of order records arriving after the window end will be dropped
- * @throws IllegalArgumentException if the timeDifference is negative
+ * @return a new window definition with no grace period. Note that this
means out-of-order records arriving after the window end will be dropped
+ * @throws IllegalArgumentException if the timeDifference is negative or
can't be represented as {@code long milliseconds}
*/
public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) throws IllegalArgumentException {
return ofTimeDifferenceAndGrace(timeDifference,
ofMillis(NO_GRACE_PERIOD));
@@ -113,12 +116,13 @@ public final class SlidingWindows {
* @param timeDifference the max time difference (inclusive) between two
records in a window
* @param afterWindowEnd the grace period to admit out-of-order events to
a window
* @return a new window definition with the specified grace period
- * @throws IllegalArgumentException if the timeDifference or
afterWindowEnd (grace period) is negative
+ * @throws IllegalArgumentException if the timeDifference or
afterWindowEnd (grace period) is negative or can't be represented as {@code
long milliseconds}
*/
public static SlidingWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
-
- final long timeDifferenceMs = timeDifference.toMillis();
- final long afterWindowEndMs = afterWindowEnd.toMillis();
+ final String timeDifferenceMsgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
+ final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
}
@@ -132,7 +136,7 @@ public final class SlidingWindows {
* @param grace the grace period to admit out-of-order events to a window
* @return a new window definition
* @throws IllegalArgumentException if the specified window size is < 0
or grace < 0, or either can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} or {@link
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
+ * @deprecated since 3.0. Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} or {@link
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public static SlidingWindows withTimeDifferenceAndGrace(final Duration
timeDifference, final Duration grace) throws IllegalArgumentException {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 220c3f5..cc1bd90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -78,6 +78,11 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be
larger than zero.");
}
+ if (advanceMs <= 0 || advanceMs > sizeMs) {
+ throw new IllegalArgumentException(String.format("Window
advancement interval should be more than zero " +
+ "and less than window duration which is %d ms, but given
advancement interval is: %d ms", sizeMs, advanceMs));
+ }
+
if (graceMs < 0) {
throw new IllegalArgumentException("Grace period must not be
negative.");
}
@@ -90,11 +95,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code
advance == size}.
- * Using the method implicitly sets the grace period to zero which means
- * that out of order records arriving after the window end will be dropped
+ * <p>
+ * CAUTION: Using this method implicitly sets the grace period to zero,
which means that any out-of-order
+ * records arriving after the window ends are considered late and will be
dropped.
*
* @param size The size of the window
- * @return a new window definition with default no grace period. Note that
this means out of order records arriving after the window end will be dropped
+ * @return a new window definition with default no grace period. Note that
this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows ofSizeWithNoGrace(final Duration size) throws
IllegalArgumentException {
@@ -108,22 +114,23 @@ public final class TimeWindows extends
Windows<TimeWindow> {
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code
advance == size}.
- * Using the method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd} which means
- * that out of order records arriving after the window end will be dropped.
- *
* <p>
- * Delay is defined as (stream_time - record_timestamp).
+ * Using this method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd}, which
+ * means that only out-of-order records arriving more than the grace
period after the window end will be dropped.
+ * The window close, after which any incoming records are considered late
and will be rejected, is defined as
+ * {@code windowEnd + afterWindowEnd}
*
* @param size The size of the window. Must be larger than zero
* @param afterWindowEnd The grace period to admit out-of-order events to
a window. Must be non-negative.
* @return a TimeWindows object with the specified size and the specified
grace period
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
*/
- public static TimeWindows ofSizeAndGrace(final Duration size, final
Duration afterWindowEnd)
- throws IllegalArgumentException {
+ public static TimeWindows ofSizeAndGrace(final Duration size, final
Duration afterWindowEnd) throws IllegalArgumentException {
+ final String sizeMsgPrefix = prepareMillisCheckFailMsgPrefix(size,
"size");
+ final long sizeMs = validateMillisecondDuration(size, sizeMsgPrefix);
- final long sizeMs = size.toMillis();
- final long afterWindowEndMs = afterWindowEnd.toMillis();
+ final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
}
@@ -139,14 +146,14 @@ public final class TimeWindows extends
Windows<TimeWindow> {
* @param size The size of the window
* @return a new window definition without specifying the grace period
(default to 24 hours minus window {@code size})
* @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
+ * @deprecated since 3.0. Use {@link #ofSizeWithNoGrace(Duration)} }
instead
*/
@Deprecated
public static TimeWindows of(final Duration size) throws
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
final long sizeMs = validateMillisecondDuration(size, msgPrefix);
- return new TimeWindows(sizeMs, sizeMs,
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - sizeMs, 0));
+ return new TimeWindows(sizeMs, sizeMs,
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - sizeMs, 0));
}
/**
@@ -163,10 +170,6 @@ public final class TimeWindows extends Windows<TimeWindow>
{
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance,
"advance");
final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
- if (advanceMs <= 0 || advanceMs > sizeMs) {
- throw new IllegalArgumentException(String.format("Window
advancement interval should be more than zero " +
- "and less than window duration which is %d ms, but given
advancement interval is: %d ms", sizeMs, advanceMs));
- }
return new TimeWindows(sizeMs, advanceMs, graceMs);
}
@@ -196,15 +199,13 @@ public final class TimeWindows extends
Windows<TimeWindow> {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)}
instead
+ * @deprecated since 3.0. Use {@link #ofSizeAndGrace(Duration, Duration)}
instead
*/
@Deprecated
public TimeWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
+ //TODO KAFKA-13021: disallow calling grace() if it was already set via
ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEndMs < 0) {
- throw new IllegalArgumentException("Grace period must not be
negative.");
- }
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index f0204d0..cd8a286 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -43,7 +43,7 @@ public abstract class Windows<W extends Window> {
* This behavior is now deprecated and additional details are available in
the motivation for the KIP
* Check out <a
href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a> for more details
*/
- protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 *
60 * 1000L;
+ protected static final long DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD = 24 *
60 * 60 * 1000L;
/**
* This constant is used as the specified grace period where we do not
have any grace periods instead of magic constants
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 1a471ef..3d38ada 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -24,7 +24,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -122,10 +122,10 @@ public class JoinWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
- assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
- assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 6L,
JoinWindows.of(ofMillis(3L)).gracePeriodMs());
- assertEquals(0L,
JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
- assertEquals(0L,
JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
+ assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+ assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 6L,
JoinWindows.of(ofMillis(3L)).gracePeriodMs());
+ assertEquals(0L,
JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+ assertEquals(0L,
JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 5821014..219c64c81 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -23,7 +23,7 @@ import java.time.Duration;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@@ -61,10 +61,10 @@ public class SessionWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
- assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
- assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L,
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
- assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
- assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
+ assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+ assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L,
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
+ assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+ assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 5bebd30..6a11df0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
@@ -107,10 +107,10 @@ public class TimeWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
- assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
- assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L,
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
- assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
- assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
+ assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+ assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L,
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
+ assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+ assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
}
@Test