Repository: flink
Updated Branches:
  refs/heads/release-1.1 f263b9917 -> c5b391c5e


[FLINK-4589] [DataStream API] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.

This closes #2476


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5b391c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5b391c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5b391c5

Branch: refs/heads/release-1.1
Commit: c5b391c5e3d748c93f5d9f254869214ce426aedf
Parents: f263b99
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Sep 7 13:51:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 15 19:32:42 2016 +0200

----------------------------------------------------------------------
 .../operators/windowing/MergingWindowSet.java   |  9 ++-
 .../windowing/MergingWindowSetTest.java         | 64 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index c806d2d..4e19c31 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -201,7 +201,7 @@ public class MergingWindowSet<W extends Window> {
                }
 
                // the new window created a new, self-contained window without 
merging
-               if (resultWindow.equals(newWindow)) {
+               if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
                        this.windows.put(resultWindow, resultWindow);
                }
 
@@ -225,4 +225,11 @@ public class MergingWindowSet<W extends Window> {
                 */
                void merge(W mergeResult, Collection<W> mergedWindows, W 
stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
        }
+
+       @Override
+       public String toString() {
+               return "MergingWindowSet{" +
+                               "windows=" + windows +
+                               '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 939f13f..e2cb6c8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -205,6 +205,70 @@ public class MergingWindowSetTest {
                assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), 
anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 
13))));
        }
 
+       /**
+        * Test merging of a large new window that covers one existing windows.
+        */
+       @Test
+       public void testMergeLargeWindowCoveringSingleWindow() throws Exception 
{
+               MergingWindowSet<TimeWindow> windowSet = new 
MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+               TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+               // add an initial small window
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new 
TimeWindow(1, 2), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new 
TimeWindow(1, 2)));
+
+               // add a new window that completely covers the existing window
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new 
TimeWindow(0, 3), mergeFunction));
+               assertTrue(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new 
TimeWindow(0, 3)));
+       }
+
+       /**
+        * Test merging of a large new window that covers multiple existing 
windows.
+        */
+       @Test
+       public void testMergeLargeWindowCoveringMultipleWindows() throws 
Exception {
+               MergingWindowSet<TimeWindow> windowSet = new 
MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+               TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+               // add several non-overlapping initial windoww
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(1, 3), windowSet.addWindow(new 
TimeWindow(1, 3), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(1, 3), windowSet.getStateWindow(new 
TimeWindow(1, 3)));
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new 
TimeWindow(5, 8), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new 
TimeWindow(5, 8)));
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new 
TimeWindow(10, 13), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(10, 13), 
windowSet.getStateWindow(new TimeWindow(10, 13)));
+
+               // add a new window that completely covers the existing windows
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new 
TimeWindow(0, 13), mergeFunction));
+               assertTrue(mergeFunction.hasMerged());
+               assertThat(mergeFunction.mergedStateWindows(), anyOf(
+                               containsInAnyOrder(new TimeWindow(0, 3), new 
TimeWindow(5, 8)),
+                               containsInAnyOrder(new TimeWindow(0, 3), new 
TimeWindow(10, 13)),
+                               containsInAnyOrder(new TimeWindow(5, 8), new 
TimeWindow(10, 13))));
+               assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), 
anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 
13))));
+
+       }
+
+
        private static class TestingMergeFunction implements 
MergingWindowSet.MergeFunction<TimeWindow> {
                private TimeWindow target = null;
                private Collection<TimeWindow> sources = null;

Reply via email to