This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ace3bbb  KAFKA-13021: clarify KIP-633 javadocs and address remaining 
feedback (#11114)
ace3bbb is described below

commit ace3bbbc7304e867f2398118418fb8de01127757
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Jul 23 16:14:37 2021 -0700

    KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback 
(#11114)
    
    There were a few followup things to address from #10926, most importantly a 
number of updates to the javadocs. Also includes a few missing verification 
checks.
    
    Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax 
<[email protected]>, Israel Ekpo
---
 docs/streams/upgrade-guide.html                    | 33 ++++++++------
 .../apache/kafka/streams/kstream/JoinWindows.java  | 50 +++++++++++++---------
 .../kafka/streams/kstream/SessionWindows.java      | 39 ++++++++++-------
 .../kafka/streams/kstream/SlidingWindows.java      | 24 ++++++-----
 .../apache/kafka/streams/kstream/TimeWindows.java  | 43 ++++++++++---------
 .../org/apache/kafka/streams/kstream/Windows.java  |  2 +-
 .../kafka/streams/kstream/JoinWindowsTest.java     | 10 ++---
 .../kafka/streams/kstream/SessionWindowsTest.java  | 10 ++---
 .../kafka/streams/kstream/TimeWindowsTest.java     | 10 ++---
 9 files changed, 126 insertions(+), 95 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 448de93..eb3b125 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -135,25 +135,32 @@
         We removed the default implementation of 
<code>RocksDBConfigSetter#close()</code>.
     </p>
     <p>
-        We dropped the default 24 hours grace period for windowed operations 
such as Window or Session aggregates, or stream-stream joins.
-        This period determines how long after a window ends any out-of-order 
records will still be processed.
-        Records coming in after the grace period has elapsed will be dropped 
from those windows.
-        With a long grace period, though Kafka Streams can handle out-of-order 
data up to that amount of time, it will also incur a high and confusing latency 
for users,
-        e.g. suppression operators with the default won't emit results up for 
24 hours, while in practice out-of-order data usually has a much smaller 
time-skewness.
-        Instead of abstracting this config from users with a long default 
value, we introduced new constructs such as 
<code>TimeWindows#ofSizeAndGrace</code> to let callers always set it upon 
constructing the windows;
-        the other setters such as <code>TimeWindows#grace</code> are 
deprecated and will be removed in the future.
+        We dropped the default 24 hours grace period for windowed operations 
such as Window or Session aggregates, or
+        stream-stream joins. This period determines how long after a window 
ends any out-of-order records will still
+        be processed. Records coming in after the grace period has elapsed are 
considered late and will be dropped.
+        But in operators such as suppression, a large grace period has the 
drawback of incurring an equally large
+        output latency. The current API made it all too easy to miss the grace 
period config completely, leading you
+        to wonder why your application seems to produce no output -- it 
actually is, but not for 24 hours.
+        <p>
+        To prevent accidentally or unknowingly falling back to the default 
24hr grace period, we deprecated all of the
+        existing static constructors for the <code>Windows</code> classes 
(such as <code>TimeWindows#of</code>). These
+        are replaced by new static constructors of two flavors: 
<code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
+        (these are for the <code>TimeWindows</code> class; analogous APIs 
exist for the <code>JoinWindows</code>,
+        <code>SessionWindows</code>, and SlidingWindows classes). With these 
new APIs you are forced to set the grace
+        period explicitly, or else consciously choose to opt out by selecting 
the <code>WithNoGrace</code> flavor which
+        sets it to 0 for situations where you really don't care about the 
grace period, for example during testing or
+        when playing around with Kafka Streams for the first time. Note that 
using the new APIs for the
+        <code>JoinWindows</code> class will also enable a fix for spurious 
left/outer join results, as described in
+        the following paragraph. For more details on the grace period and new 
static constructors, see
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams";>KIP-633</a>
     </p>
     <p>
         Additionally, in older versions Kafka Streams emitted stream-stream 
left/outer join results eagerly. This behavior may lead to spurious left/outer 
join result records.
         In this release, we changed the behavior to avoid spurious results and 
left/outer join result are only emitted after the join window is closed, i.e., 
after the grace period elapsed.
         To maintain backward compatibility, the old API 
<code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit 
behavior and only the new
-        APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and 
<code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
+        APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and 
<code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior. 
Check out
+        <a 
href="https://issues.apache.org/jira/browse/KAFKA-10847";>KAFKA-10847</a> for 
more information.
     </p>
-    <ul>
-        <li><a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams";>KIP-633</a>:
 Drop 24 hour default of grace period in Streams</li>
-        <li><a 
href="https://issues.apache.org/jira/browse/KAFKA-10847";>KAFKA-10847</a>: Avoid 
spurious left/outer join results in stream-stream join</li>
-    </ul>
-
     <p>
         The public <code>topicGroupId</code> and <code>partition</code> fields 
on TaskId have been deprecated and replaced with getters. Please migrate to 
using the new <code>TaskId.subtopology()</code>
         (which replaces <code>topicGroupId</code>) and 
<code>TaskId.partition()</code> APIs instead. Also, the 
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been 
deprecated
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 9a7c7b5..1b26bcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -102,49 +102,61 @@ public class JoinWindows extends Windows<Window> {
 
     /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
-     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
-     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
-     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
-     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} before or after
+     * the timestamp of the record from the primary stream.
+     * <p>
+     * Using this method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd}, which
+     * means that only out-of-order records arriving more than the grace 
period after the window end will be dropped.
+     * The window close, after which any incoming records are considered late 
and will be rejected, is defined as
+     * {@code windowEnd + afterWindowEnd}
      *
      * @param timeDifference join window interval
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
-     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
      * @return A new JoinWindows object with the specified window definition 
and grace period
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     *                                  if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
      */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
+        final String timeDifferenceMsgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
+
+        final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
+
+        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, 
afterWindowEndMs, true);
     }
 
     /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
-     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
-     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
-     * which means that out of order records arriving after the window end 
will be dropped.
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} before or after
+     * the timestamp of the record from the primary stream.
+     * <p>
+     * CAUTION: Using this method implicitly sets the grace period to zero, 
which means that any out-of-order
+     * records arriving after the window ends are considered late and will be 
dropped.
      *
      * @param timeDifference join window interval
+     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out-of-order records arriving after the 
window end will be dropped
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
-     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out of order records arriving after the 
window end will be dropped
      */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
+        return ofTimeDifferenceAndGrace(timeDifference, 
Duration.ofMillis(NO_GRACE_PERIOD));
     }
 
     /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
-     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} before or after
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference
+     * @param timeDifference join window interval
      * @return a new JoinWindows object with the window definition with and 
grace period (default to 24 hours minus {@code timeDifference})
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)}} instead
+     * @deprecated since 3.0. Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)}} instead
      */
     @Deprecated
     public static JoinWindows of(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
+        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, 
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), 
false);
     }
 
     /**
@@ -203,16 +215,14 @@ public class JoinWindows extends Windows<Window> {
      *
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
-     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
      */
     @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
+        //TODO KAFKA-13021: disallow calling grace() if it was already set via 
ofTimeDifferenceAndGrace/WithNoGrace()
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be 
negative.");
-        }
         return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, false);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 6956905..85451c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,7 +23,7 @@ import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
 import static java.time.Duration.ofMillis;
 
@@ -91,16 +91,16 @@ public final class SessionWindows {
 
     /**
      * Creates a new window specification with the specified inactivity gap.
-     * Using the method implicitly sets the grace period to zero which
-     * means that out of order records arriving after the window end will be 
dropped
-     *
      * <p>
      * Note that new events may change the boundaries of session windows, so 
aggressive
      * close times can lead to surprising results in which an out-of-order 
event is rejected and then
      * a subsequent event moves the window boundary forward.
+     * <p>
+     * CAUTION: Using this method implicitly sets the grace period to zero, 
which means that any out-of-order
+     * records arriving after the window ends are considered late and will be 
dropped.
      *
      * @param inactivityGap the gap of inactivity between sessions
-     * @return a window definition with the window size and no grace period. 
Note that this means out of order records arriving after the window end will be 
dropped
+     * @return a window definition with the window size and no grace period. 
Note that this means out-of-order records arriving after the window end will be 
dropped
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
      */
     public static SessionWindows ofInactivityGapWithNoGrace(final Duration 
inactivityGap) {
@@ -109,23 +109,31 @@ public final class SessionWindows {
 
     /**
      * Creates a new window specification with the specified inactivity gap.
-     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which
-     * means that out of order records arriving after the window end will be 
dropped
-     *
      * <p>
      * Note that new events may change the boundaries of session windows, so 
aggressive
      * close times can lead to surprising results in which an out-of-order 
event is rejected and then
      * a subsequent event moves the window boundary forward.
+     * <p>
+     * Using this method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd}, which
+     * means that only out-of-order records arriving more than the grace 
period after the window end will be dropped.
+     * The window close, after which any incoming records are considered late 
and will be rejected, is defined as
+     * {@code windowEnd + afterWindowEnd}
      *
      * @param inactivityGap the gap of inactivity between sessions
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return A SessionWindows object with the specified inactivity gap and 
grace period
-     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+     *                                  if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
      */
     public static SessionWindows ofInactivityGapAndGrace(final Duration 
inactivityGap, final Duration afterWindowEnd) {
-        return new SessionWindows(inactivityGap.toMillis(), 
afterWindowEnd.toMillis());
-    }
+        final String inactivityGapMsgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+        final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, inactivityGapMsgPrefix);
 
+        final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
+
+        return new SessionWindows(inactivityGapMs, afterWindowEndMs);
+    }
 
     /**
      * Create a new window specification with the specified inactivity gap.
@@ -133,14 +141,14 @@ public final class SessionWindows {
      * @param inactivityGap the gap of inactivity between sessions
      * @return a new window specification without specifying a grace period 
(default to 24 hours minus {@code inactivityGap})
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} 
instead
+     * @deprecated since 3.0. Use {@link 
#ofInactivityGapWithNoGrace(Duration)} instead
      */
     @Deprecated
     public static SessionWindows with(final Duration inactivityGap) {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
         final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefix);
 
-        return new SessionWindows(inactivityGapMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
+        return new SessionWindows(inactivityGapMs, 
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
     }
 
     /**
@@ -153,11 +161,12 @@ public final class SessionWindows {
      *
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
-     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
      */
     @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
+        //TODO KAFKA-13021: disallow calling grace() if it was already set via 
ofTimeDifferenceAndGrace/WithNoGrace()
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 2cbda6d..159fea7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -93,13 +93,16 @@ public final class SlidingWindows {
     }
 
     /**
-     * Return a window definition with the window size
-     * Using the method implicitly sets the grace period to zero which means 
that
-     * out of order records arriving after the window end will be dropped
+     * Return a window definition with the window size based on the given 
maximum time difference (inclusive) between
+     * records in the same window and given window grace period. Reject 
out-of-order events that arrive after {@code grace}.
+     * A window is closed when {@code stream-time > window-end + grace-period}.
+     * <p>
+     * CAUTION: Using this method implicitly sets the grace period to zero, 
which means that any out-of-order
+     * records arriving after the window ends are considered late and will be 
dropped.
      *
      * @param timeDifference the max time difference (inclusive) between two 
records in a window
-     * @return a new window definition with no grace period. Note that this 
means out of order records arriving after the window end will be dropped
-     * @throws IllegalArgumentException if the timeDifference is negative
+     * @return a new window definition with no grace period. Note that this 
means out-of-order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the timeDifference is negative or 
can't be represented as {@code long milliseconds}
      */
     public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) throws IllegalArgumentException {
         return ofTimeDifferenceAndGrace(timeDifference, 
ofMillis(NO_GRACE_PERIOD));
@@ -113,12 +116,13 @@ public final class SlidingWindows {
      * @param timeDifference the max time difference (inclusive) between two 
records in a window
      * @param afterWindowEnd  the grace period to admit out-of-order events to 
a window
      * @return a new window definition with the specified grace period
-     * @throws IllegalArgumentException if the timeDifference or 
afterWindowEnd (grace period) is negative
+     * @throws IllegalArgumentException if the timeDifference or 
afterWindowEnd (grace period) is negative or can't be represented as {@code 
long milliseconds}
      */
     public static SlidingWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
-
-        final long timeDifferenceMs = timeDifference.toMillis();
-        final long afterWindowEndMs = afterWindowEnd.toMillis();
+        final String timeDifferenceMsgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
+        final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
 
         return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
     }
@@ -132,7 +136,7 @@ public final class SlidingWindows {
      * @param grace the grace period to admit out-of-order events to a window
      * @return a new window definition
      * @throws IllegalArgumentException if the specified window size is &lt; 0 
or grace &lt; 0, or either can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} or {@link 
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
+     * @deprecated since 3.0. Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} or {@link 
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
     @Deprecated
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 220c3f5..cc1bd90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -78,6 +78,11 @@ public final class TimeWindows extends Windows<TimeWindow> {
             throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
         }
 
+        if (advanceMs <= 0 || advanceMs > sizeMs) {
+            throw new IllegalArgumentException(String.format("Window 
advancement interval should be more than zero " +
+                "and less than window duration which is %d ms, but given 
advancement interval is: %d ms", sizeMs, advanceMs));
+        }
+
         if (graceMs < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
@@ -90,11 +95,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
-     * Using the method implicitly sets the grace period to zero which means
-     * that out of order records arriving after the window end will be dropped
+     * <p>
+     * CAUTION: Using this method implicitly sets the grace period to zero, 
which means that any out-of-order
+     * records arriving after the window ends are considered late and will be 
dropped.
      *
      * @param size The size of the window
-     * @return a new window definition with default no grace period. Note that 
this means out of order records arriving after the window end will be dropped
+     * @return a new window definition with default no grace period. Note that 
this means out-of-order records arriving after the window end will be dropped
      * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
      */
     public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
@@ -108,22 +114,23 @@ public final class TimeWindows extends 
Windows<TimeWindow> {
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
-     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which means
-     * that out of order records arriving after the window end will be dropped.
-     *
      * <p>
-     * Delay is defined as (stream_time - record_timestamp).
+     * Using this method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd}, which
+     * means that only out-of-order records arriving more than the grace 
period after the window end will be dropped.
+     * The window close, after which any incoming records are considered late 
and will be rejected, is defined as
+     * {@code windowEnd + afterWindowEnd}
      *
      * @param size The size of the window. Must be larger than zero
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window. Must be non-negative.
      * @return a TimeWindows object with the specified size and the specified 
grace period
      * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
      */
-    public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd)
-            throws IllegalArgumentException {
+    public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd) throws IllegalArgumentException {
+        final String sizeMsgPrefix = prepareMillisCheckFailMsgPrefix(size, 
"size");
+        final long sizeMs = validateMillisecondDuration(size, sizeMsgPrefix);
 
-        final long sizeMs = size.toMillis();
-        final long afterWindowEndMs = afterWindowEnd.toMillis();
+        final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
 
         return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
     }
@@ -139,14 +146,14 @@ public final class TimeWindows extends 
Windows<TimeWindow> {
      * @param size The size of the window
      * @return a new window definition without specifying the grace period 
(default to 24 hours minus window {@code size})
      * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
+     * @deprecated since 3.0. Use {@link #ofSizeWithNoGrace(Duration)} } 
instead
      */
     @Deprecated
     public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
 
-        return new TimeWindows(sizeMs, sizeMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - sizeMs, 0));
+        return new TimeWindows(sizeMs, sizeMs, 
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - sizeMs, 0));
     }
 
     /**
@@ -163,10 +170,6 @@ public final class TimeWindows extends Windows<TimeWindow> 
{
     public TimeWindows advanceBy(final Duration advance) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, 
"advance");
         final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
-        if (advanceMs <= 0 || advanceMs > sizeMs) {
-            throw new IllegalArgumentException(String.format("Window 
advancement interval should be more than zero " +
-                    "and less than window duration which is %d ms, but given 
advancement interval is: %d ms", sizeMs, advanceMs));
-        }
         return new TimeWindows(sizeMs, advanceMs, graceMs);
     }
 
@@ -196,15 +199,13 @@ public final class TimeWindows extends 
Windows<TimeWindow> {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)} 
instead
+     * @deprecated since 3.0. Use {@link #ofSizeAndGrace(Duration, Duration)} 
instead
      */
     @Deprecated
     public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
+        //TODO KAFKA-13021: disallow calling grace() if it was already set via 
ofTimeDifferenceAndGrace/WithNoGrace()
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be 
negative.");
-        }
 
         return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index f0204d0..cd8a286 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -43,7 +43,7 @@ public abstract class Windows<W extends Window> {
      * This behavior is now deprecated and additional details are available in 
the motivation for the KIP
      * Check out <a 
href="https://cwiki.apache.org/confluence/x/Ho2NCg";>KIP-633</a> for more details
      */
-    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 
60 * 1000L;
+    protected static final long DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD = 24 * 
60 * 60 * 1000L;
 
     /**
      * This constant is used as the specified grace period where we do not 
have any grace periods instead of magic constants
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 1a471ef..3d38ada 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -24,7 +24,7 @@ import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -122,10 +122,10 @@ public class JoinWindowsTest {
 
     @Test
     public void oldAPIShouldSetDefaultGracePeriod() {
-        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
-        assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 6L, 
JoinWindows.of(ofMillis(3L)).gracePeriodMs());
-        assertEquals(0L, 
JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
-        assertEquals(0L, 
JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
+        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+        assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 6L, 
JoinWindows.of(ofMillis(3L)).gracePeriodMs());
+        assertEquals(0L, 
JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+        assertEquals(0L, 
JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 5821014..219c64c81 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -23,7 +23,7 @@ import java.time.Duration;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
@@ -61,10 +61,10 @@ public class SessionWindowsTest {
 
     @Test
     public void oldAPIShouldSetDefaultGracePeriod() {
-        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
-        assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L, 
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
-        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
-        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
+        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+        assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, 
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
+        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 5bebd30..6a11df0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
@@ -107,10 +107,10 @@ public class TimeWindowsTest {
 
     @Test
     public void oldAPIShouldSetDefaultGracePeriod() {
-        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
-        assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L, 
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
-        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
-        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
+        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
+        assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, 
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
+        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
+        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
     }
 
     @Test

Reply via email to