Repository: kafka Updated Branches: refs/heads/0.10.0 419e6517c -> c0537b5f0
KAFKA-3784: TimeWindows#windowsFor calculation is incorrect - Fixed the logic calculating the windows that are affected by a new â¦event in the case of hopping windows and a small overlap. - Added a unit test that tests for the issue Author: Tom Rybak <[email protected]> Reviewers: Michael G. Noll, Matthias J. Sax, Guozhang Wang Closes #1462 from trybak/bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives (cherry picked from commit 234fa5a6949c9a5bfb4f543989c2ece84fcce033) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c0537b5f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c0537b5f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c0537b5f Branch: refs/heads/0.10.0 Commit: c0537b5f059e025fa268d92a58c27d98540b7c5a Parents: 419e651 Author: Tom Rybak <[email protected]> Authored: Fri Jun 3 13:21:40 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Jun 3 13:22:00 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/kstream/TimeWindows.java | 4 +--- .../org/apache/kafka/streams/kstream/TimeWindowsTest.java | 8 ++++++++ 2 files changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c0537b5f/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index e4ce883..001e92e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -99,9 +99,7 @@ public class TimeWindows extends Windows<TimeWindow> { @Override public Map<Long, TimeWindow> windowsFor(long timestamp) { - long enclosed = (size - 1) / advance; - long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance); - + long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance) * this.advance; Map<Long, TimeWindow> windows = new HashMap<>(); while (windowStart <= timestamp) { TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); http://git-wip-us.apache.org/repos/asf/kafka/blob/c0537b5f/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index e9ff235..62b12a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -113,6 +113,14 @@ public class TimeWindowsTest { } @Test + public void windowsForBarelyOverlappingHoppingWindows() { + TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L); + Map<Long, TimeWindow> matched = windows.windowsFor(7L); + assertEquals(1, matched.size()); + assertEquals(new TimeWindow(5L, 11L), matched.get(5L)); + } + + @Test public void windowsForTumblingWindows() { TimeWindows windows = TimeWindows.of(anyName, 12L); Map<Long, TimeWindow> matched = windows.windowsFor(21L);
