This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 8164c7c  KAFKA-9524: increase retention time for window and grace 
periods longer than one day (#10091)
8164c7c is described below

commit 8164c7c99653ccd72b0ce8215d25ad05c60a30b7
Author: Marco Aurelio Lotz <[email protected]>
AuthorDate: Fri Feb 19 03:18:53 2021 +0100

    KAFKA-9524: increase retention time for window and grace periods longer 
than one day (#10091)
    
    Reviewers: Victoria Xia <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../java/org/apache/kafka/streams/kstream/TimeWindows.java    | 11 ++++++++---
 .../org/apache/kafka/streams/kstream/TimeWindowsTest.java     | 11 ++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index cd52dd5..9fd963a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -57,6 +57,8 @@ import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
  */
 public final class TimeWindows extends Windows<TimeWindow> {
 
+    private static final long EMPTY_GRACE_PERIOD = -1;
+
     private final long maintainDurationMs;
 
     /** The size of the windows in milliseconds. */
@@ -111,7 +113,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
             throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
         }
         // This is a static factory method, so we initialize grace and 
retention to the defaults.
-        return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
+        return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD, 
DEFAULT_RETENTION_MS);
     }
 
     /**
@@ -214,7 +216,10 @@ public final class TimeWindows extends Windows<TimeWindow> 
{
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default 
behavior,
         // or we can default to (24h - size) if you want to be super accurate.
-        return graceMs != -1 ? graceMs : maintainMs() - size();
+        if (graceMs != EMPTY_GRACE_PERIOD) {
+            return graceMs;
+        }
+        return Math.max(maintainDurationMs - sizeMs, 0);
     }
 
     /**
@@ -245,7 +250,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @Override
     @Deprecated
     public long maintainMs() {
-        return Math.max(maintainDurationMs, sizeMs);
+        return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
     }
 
     @SuppressWarnings("deprecation") // removing segments from Windows will 
fix this
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 69b73c8..00e2b4c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Map;
 
+import static java.time.Duration.ofDays;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@@ -53,12 +55,19 @@ public class TimeWindowsTest {
 
     @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
-    public void 
shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
 {
+    public void 
shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
 {
         final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
         assertEquals(windowSize, 
TimeWindows.of(ofMillis(windowSize)).maintainMs());
     }
 
     @Test
+    public void 
shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime()
 {
+        final Duration windowsSize = ofDays(1).minus(ofMillis(1));
+        final Duration gracePeriod = ofMillis(2);
+        assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), 
TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
+    }
+
+    @Test
     public void windowSizeMustNotBeZero() {
         assertThrows(IllegalArgumentException.class, () -> 
TimeWindows.of(ofMillis(0)));
     }

Reply via email to