This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8112b5 KAFKA-9524: increase retention time for window and grace
periods longer than one day (#10091)
c8112b5 is described below
commit c8112b5ecdda6b62d34ad97fcebbf5c7fec3de53
Author: Marco Aurelio Lotz <[email protected]>
AuthorDate: Fri Feb 19 03:18:53 2021 +0100
KAFKA-9524: increase retention time for window and grace periods longer
than one day (#10091)
Reviewers: Victoria Xia <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../java/org/apache/kafka/streams/kstream/TimeWindows.java | 11 ++++++++---
.../org/apache/kafka/streams/kstream/TimeWindowsTest.java | 11 ++++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index cd52dd5..9fd963a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -57,6 +57,8 @@ import static
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/
public final class TimeWindows extends Windows<TimeWindow> {
+ private static final long EMPTY_GRACE_PERIOD = -1;
+
private final long maintainDurationMs;
/** The size of the windows in milliseconds. */
@@ -111,7 +113,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be
larger than zero.");
}
// This is a static factory method, so we initialize grace and
retention to the defaults.
- return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
+ return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD,
DEFAULT_RETENTION_MS);
}
/**
@@ -214,7 +216,10 @@ public final class TimeWindows extends Windows<TimeWindow>
{
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default
behavior,
// or we can default to (24h - size) if you want to be super accurate.
- return graceMs != -1 ? graceMs : maintainMs() - size();
+ if (graceMs != EMPTY_GRACE_PERIOD) {
+ return graceMs;
+ }
+ return Math.max(maintainDurationMs - sizeMs, 0);
}
/**
@@ -245,7 +250,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@Override
@Deprecated
public long maintainMs() {
- return Math.max(maintainDurationMs, sizeMs);
+ return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
}
@SuppressWarnings("deprecation") // removing segments from Windows will
fix this
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 69b73c8..00e2b4c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test;
+import java.time.Duration;
import java.util.Map;
+import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@@ -53,12 +55,19 @@ public class TimeWindowsTest {
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
- public void
shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
{
+ public void
shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
{
final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
assertEquals(windowSize,
TimeWindows.of(ofMillis(windowSize)).maintainMs());
}
@Test
+ public void
shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime()
{
+ final Duration windowsSize = ofDays(1).minus(ofMillis(1));
+ final Duration gracePeriod = ofMillis(2);
+ assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(),
TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
+ }
+
+ @Test
public void windowSizeMustNotBeZero() {
assertThrows(IllegalArgumentException.class, () ->
TimeWindows.of(ofMillis(0)));
}