This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 53044a0  [FLINK-11326] Allow negative offsets in window assigners
53044a0 is described below

commit 53044a08236a6b3a34199f5bd65187fad9014e20
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Jan 22 09:04:58 2019 +0800

    [FLINK-11326] Allow negative offsets in window assigners
---
 .../assigners/SlidingEventTimeWindows.java         |  8 ++--
 .../assigners/SlidingProcessingTimeWindows.java    |  8 ++--
 .../assigners/TumblingEventTimeWindows.java        |  4 +-
 .../assigners/TumblingProcessingTimeWindows.java   |  4 +-
 .../windowing/SlidingEventTimeWindowsTest.java     | 52 ++++++++++++++++++--
 .../SlidingProcessingTimeWindowsTest.java          | 55 ++++++++++++++++++++--
 .../windowing/TumblingEventTimeWindowsTest.java    | 20 ++++++--
 .../TumblingProcessingTimeWindowsTest.java         | 25 ++++++++--
 8 files changed, 148 insertions(+), 28 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index b574c17..5944181 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -54,8 +54,9 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
        private final long offset;
 
        protected SlidingEventTimeWindows(long size, long slide, long offset) {
-               if (offset < 0 || offset >= slide || size <= 0) {
-                       throw new 
IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= 
offset < slide and size > 0");
+               if (Math.abs(offset) >= slide || size <= 0) {
+                       throw new 
IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
+                               "abs(offset) < slide and size > 0");
                }
 
                this.size = size;
@@ -130,8 +131,7 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
         * @return The time policy.
         */
        public static SlidingEventTimeWindows of(Time size, Time slide, Time 
offset) {
-               return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(),
-                       offset.toMilliseconds() % slide.toMilliseconds());
+               return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(), offset.toMilliseconds());
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 78cc8b2..3aeb258 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -52,8 +52,9 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
        private final long slide;
 
        private SlidingProcessingTimeWindows(long size, long slide, long 
offset) {
-               if (offset < 0 || offset >= slide || size <= 0) {
-                       throw new 
IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 
0 <= offset < slide and size > 0");
+               if (Math.abs(offset) >= slide || size <= 0) {
+                       throw new 
IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 
" +
+                               "abs(offset) < slide and size > 0");
                }
 
                this.size = size;
@@ -123,8 +124,7 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
         * @return The time policy.
         */
        public static SlidingProcessingTimeWindows of(Time size, Time slide, 
Time offset) {
-               return new SlidingProcessingTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(),
-                       offset.toMilliseconds() % slide.toMilliseconds());
+               return new SlidingProcessingTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(), offset.toMilliseconds());
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 88710c2..30a49de 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -51,8 +51,8 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
        private final long offset;
 
        protected TumblingEventTimeWindows(long size, long offset) {
-               if (offset < 0 || offset >= size) {
-                       throw new 
IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= 
offset < size");
+               if (Math.abs(offset) >= size) {
+                       throw new 
IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 
abs(offset) < size");
                }
 
                this.size = size;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 6a401ef..11f6caa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -49,8 +49,8 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
        private final long offset;
 
        private TumblingProcessingTimeWindows(long size, long offset) {
-               if (offset < 0 || offset >= size) {
-                       throw new 
IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 
 0 <= offset < size");
+               if (Math.abs(offset) >= size) {
+                       throw new 
IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 
abs(offset) < size");
                }
 
                this.size = size;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 95a8314..6cf63af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -105,6 +105,36 @@ public class SlidingEventTimeWindowsTest extends 
TestLogger {
        }
 
        @Test
+       public void testWindowAssignmentWithNegativeOffset() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                       mock(WindowAssigner.WindowAssignerContext.class);
+
+               SlidingEventTimeWindows assigner =
+                       SlidingEventTimeWindows.of(Time.milliseconds(5000), 
Time.milliseconds(1000), Time.milliseconds(-100));
+
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
containsInAnyOrder(
+                       timeWindow(-4100, 900),
+                       timeWindow(-3100, 1900),
+                       timeWindow(-2100, 2900),
+                       timeWindow(-1100, 3900),
+                       timeWindow(-100, 4900)));
+
+               assertThat(assigner.assignWindows("String", 4899L, 
mockContext), containsInAnyOrder(
+                       timeWindow(-100, 4900),
+                       timeWindow(900, 5900),
+                       timeWindow(1900, 6900),
+                       timeWindow(2900, 7900),
+                       timeWindow(3900, 8900)));
+
+               assertThat(assigner.assignWindows("String", 4900L, 
mockContext), containsInAnyOrder(
+                       timeWindow(900, 5900),
+                       timeWindow(1900, 6900),
+                       timeWindow(2900, 7900),
+                       timeWindow(3900, 8900),
+                       timeWindow(4900, 9900)));
+       }
+
+       @Test
        public void testTimeUnits() {
                // sanity check with one other time unit
 
@@ -141,21 +171,35 @@ public class SlidingEventTimeWindowsTest extends 
TestLogger {
                        SlidingEventTimeWindows.of(Time.seconds(-2), 
Time.seconds(1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
 
                try {
                        SlidingEventTimeWindows.of(Time.seconds(2), 
Time.seconds(-1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
+               }
+
+               try {
+                       SlidingEventTimeWindows.of(Time.seconds(-20), 
Time.seconds(10), Time.seconds(-1));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
+               }
+
+               try {
+                       SlidingEventTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-11));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
 
                try {
-                       SlidingEventTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-1));
+                       SlidingEventTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(11));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 69b628a..5567a17 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -112,6 +112,39 @@ public class SlidingProcessingTimeWindowsTest extends 
TestLogger {
        }
 
        @Test
+       public void testWindowAssignmentWithNegativeOffset() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                       mock(WindowAssigner.WindowAssignerContext.class);
+
+               SlidingProcessingTimeWindows assigner =
+                       
SlidingProcessingTimeWindows.of(Time.milliseconds(5000), 
Time.milliseconds(1000), Time.milliseconds(-100));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), containsInAnyOrder(
+                       timeWindow(-4100, 900),
+                       timeWindow(-3100, 1900),
+                       timeWindow(-2100, 2900),
+                       timeWindow(-1100, 3900),
+                       timeWindow(-100, 4900)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), containsInAnyOrder(
+                       timeWindow(-100, 4900),
+                       timeWindow(900, 5900),
+                       timeWindow(1900, 6900),
+                       timeWindow(2900, 7900),
+                       timeWindow(3900, 8900)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), containsInAnyOrder(
+                       timeWindow(900, 5900),
+                       timeWindow(1900, 6900),
+                       timeWindow(2900, 7900),
+                       timeWindow(3900, 8900),
+                       timeWindow(4900, 9900)));
+       }
+
+       @Test
        public void testTimeUnits() {
                // sanity check with one other time unit
 
@@ -151,21 +184,35 @@ public class SlidingProcessingTimeWindowsTest extends 
TestLogger {
                        SlidingProcessingTimeWindows.of(Time.seconds(-2), 
Time.seconds(1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
 
                try {
                        SlidingProcessingTimeWindows.of(Time.seconds(2), 
Time.seconds(-1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
+               }
+
+               try {
+                       SlidingProcessingTimeWindows.of(Time.seconds(-20), 
Time.seconds(10), Time.seconds(-1));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
+               }
+
+               try {
+                       SlidingProcessingTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-11));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
 
                try {
-                       SlidingProcessingTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-1));
+                       SlidingProcessingTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(11));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
slide and size > 0"));
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 9e4669f..0451c15 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -69,6 +69,18 @@ public class TumblingEventTimeWindowsTest extends TestLogger 
{
        }
 
        @Test
+       public void testWindowAssignmentWithNegativeOffset() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                       mock(WindowAssigner.WindowAssignerContext.class);
+
+               TumblingEventTimeWindows assigner = 
TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
+
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(-100, 4900)));
+               assertThat(assigner.assignWindows("String", 4899L, 
mockContext), contains(timeWindow(-100, 4900)));
+               assertThat(assigner.assignWindows("String", 4900L, 
mockContext), contains(timeWindow(4900, 9900)));
+       }
+
+       @Test
        public void testTimeUnits() {
                // sanity check with one other time unit
 
@@ -88,21 +100,21 @@ public class TumblingEventTimeWindowsTest extends 
TestLogger {
                        TumblingEventTimeWindows.of(Time.seconds(-1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
 
                try {
                        TumblingEventTimeWindows.of(Time.seconds(10), 
Time.seconds(20));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
 
                try {
-                       TumblingEventTimeWindows.of(Time.seconds(10), 
Time.seconds(-1));
+                       TumblingEventTimeWindows.of(Time.seconds(10), 
Time.seconds(-11));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index a611fc0..1306252 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -80,6 +80,23 @@ public class TumblingProcessingTimeWindowsTest extends 
TestLogger {
        }
 
        @Test
+       public void testWindowAssignmentWithNegativeOffset() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                       mock(WindowAssigner.WindowAssignerContext.class);
+
+               TumblingProcessingTimeWindows assigner = 
TumblingProcessingTimeWindows.of(Time.milliseconds(5000), 
Time.milliseconds(-100));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(-100, 4900)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(-100, 4900)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(4900, 9900)));
+       }
+
+       @Test
        public void testTimeUnits() {
                // sanity check with one other time unit
 
@@ -104,21 +121,21 @@ public class TumblingProcessingTimeWindowsTest extends 
TestLogger {
                        TumblingProcessingTimeWindows.of(Time.seconds(-1));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
 
                try {
                        TumblingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(20));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
 
                try {
-                       TumblingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(-1));
+                       TumblingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(-11));
                        fail("should fail");
                } catch (IllegalArgumentException e) {
-                       assertThat(e.toString(), containsString("0 <= offset < 
size"));
+                       assertThat(e.toString(), containsString("abs(offset) < 
size"));
                }
        }
 

Reply via email to