http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java index bb07996..ada4d6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -30,150 +32,146 @@ import org.hamcrest.TypeSafeMatcher; */ public class StreamRecordMatchers { - public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( - T value) { - - return isStreamRecord(Matchers.equalTo(value)); - } - - public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( - T value, - long timestamp) { - - return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp)); - } - - public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( - Matcher<? super T> valueMatcher) { - return new StreamRecordMatcher<>(valueMatcher, Matchers.anything()); - } - - public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( - Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { - return new StreamRecordMatcher<>(valueMatcher, timestampMatcher); - } - - public static Matcher<TimeWindow> timeWindow(long start, long end) { - return Matchers.equalTo(new TimeWindow(start, end)); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @SafeVarargs - public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) { - return (Matcher) Matchers.containsInAnyOrder(windows); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - T value) { - return isWindowedValue(Matchers.equalTo(value)); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - T value, - long timestamp) { - return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp)); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - T value, - long timestamp, - W window) { - return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); - } - - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher, long timestamp) { - return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything()); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher, long timestamp, W window) { - return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window)); - } - - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher) { - return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); - } - - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { - return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) { - return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher); - } - - public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( - Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) { - return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher); - } - - - private StreamRecordMatchers() {} - - private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> { - - private Matcher<? super T> valueMatcher; - private Matcher<? super Long> timestampMatcher; - - private StreamRecordMatcher( - Matcher<? super T> valueMatcher, - Matcher<? super Long> timestampMatcher) { - this.valueMatcher = valueMatcher; - this.timestampMatcher = timestampMatcher; - } - - @Override - public void describeTo(Description description) { - description - .appendText("a StreamRecordValue(").appendValue(valueMatcher) - .appendText(", ").appendValue(timestampMatcher) - .appendText(")"); - } - - @Override - protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) { - return valueMatcher.matches(streamRecord.getValue()) - && timestampMatcher.matches(streamRecord.getTimestamp()); - } - } - - private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> { - - private Matcher<? super T> valueMatcher; - private Matcher<? super Long> timestampMatcher; - private Matcher<? super W> windowMatcher; - - - private WindowedValueMatcher( - Matcher<? super T> valueMatcher, - Matcher<? super Long> timestampMatcher, - Matcher<? super W> windowMatcher) { - this.valueMatcher = valueMatcher; - this.timestampMatcher = timestampMatcher; - this.windowMatcher = windowMatcher; - } - - @Override - public void describeTo(Description description) { - description - .appendText("a WindowedValue(").appendValue(valueMatcher) - .appendText(", ").appendValue(timestampMatcher) - .appendText(", ").appendValue(timestampMatcher) - .appendText(")"); - } - - @Override - protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) { - return valueMatcher.matches(streamRecord.getValue().value()) - && timestampMatcher.matches(streamRecord.getTimestamp()) - && windowMatcher.matches(streamRecord.getValue().window()); - } - } + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + T value) { + + return isStreamRecord(Matchers.equalTo(value)); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + T value, + long timestamp) { + + return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp)); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + Matcher<? super T> valueMatcher) { + return new StreamRecordMatcher<>(valueMatcher, Matchers.anything()); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { + return new StreamRecordMatcher<>(valueMatcher, timestampMatcher); + } + + public static Matcher<TimeWindow> timeWindow(long start, long end) { + return Matchers.equalTo(new TimeWindow(start, end)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @SafeVarargs + public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) { + return (Matcher) Matchers.containsInAnyOrder(windows); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value) { + return isWindowedValue(Matchers.equalTo(value)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value, + long timestamp) { + return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value, + long timestamp, + W window) { + return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything()); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp, W window) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { + return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) { + return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher); + } + + private StreamRecordMatchers() { + } + + private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> { + + private Matcher<? super T> valueMatcher; + private Matcher<? super Long> timestampMatcher; + + private StreamRecordMatcher( + Matcher<? super T> valueMatcher, + Matcher<? super Long> timestampMatcher) { + this.valueMatcher = valueMatcher; + this.timestampMatcher = timestampMatcher; + } + + @Override + public void describeTo(Description description) { + description + .appendText("a StreamRecordValue(").appendValue(valueMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(")"); + } + + @Override + protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) { + return valueMatcher.matches(streamRecord.getValue()) + && timestampMatcher.matches(streamRecord.getTimestamp()); + } + } + + private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> { + + private Matcher<? super T> valueMatcher; + private Matcher<? super Long> timestampMatcher; + private Matcher<? super W> windowMatcher; + + private WindowedValueMatcher( + Matcher<? super T> valueMatcher, + Matcher<? super Long> timestampMatcher, + Matcher<? super W> windowMatcher) { + this.valueMatcher = valueMatcher; + this.timestampMatcher = timestampMatcher; + this.windowMatcher = windowMatcher; + } + + @Override + public void describeTo(Description description) { + description + .appendText("a WindowedValue(").appendValue(valueMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(")"); + } + + @Override + protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) { + return valueMatcher.matches(streamRecord.getValue().value()) + && timestampMatcher.matches(streamRecord.getTimestamp()) + && windowMatcher.matches(streamRecord.getValue().window()); + } + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java index 341171d..f4e0f1d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java @@ -19,41 +19,45 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + import org.junit.Assert; import org.junit.Test; import java.util.concurrent.TimeUnit; +/** + * Tests for {@link TimeWindow}. + */ public class TimeWindowTest { @Test public void testGetWindowStartWithOffset() { - //[0,7),[7,14),[14,21)... + // [0, 7), [7, 14), [14, 21)... long offset = 0; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7); - //[-4,3),[3,10),[10,17)... + // [-4, 3), [3, 10), [10, 17)... offset = 3; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10); - //[-2,5),[5,12),[12,19)... + // [-2, 5), [5, 12), [12, 19)... offset = -2; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4, offset, 7), -2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 5); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12, offset, 7), 12); // for GMT+8:00 - offset = - TimeUnit.HOURS.toMillis(8); + offset = -TimeUnit.HOURS.toMillis(8); long size = TimeUnit.DAYS.toMillis(1); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450L, offset, size), 1470844800000L); } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index df65ca2..dc0e21c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.FoldFunction; @@ -28,8 +29,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows; @@ -161,7 +162,7 @@ public class TimeWindowTranslationTest { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) - .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS)) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) .reduce(new DummyReducer()); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); @@ -185,7 +186,7 @@ public class TimeWindowTranslationTest { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) - .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS)) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) .fold(new Tuple2<>("", 1), new DummyFolder()); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); @@ -235,7 +236,7 @@ public class TimeWindowTranslationTest { * These tests ensure that the fast aligned time windows operator is used if the * conditions are right. * - * TODO: update once the fast aligned time windows operator is in + * <p>TODO: update once the fast aligned time windows operator is in */ @Ignore @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java index 4267444..5aa47e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.JobID; @@ -36,9 +37,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalMergingState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; import org.apache.flink.streaming.api.operators.TestInternalTimerService; -import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -123,7 +124,7 @@ public class TriggerTestHarness<T, W extends Window> { /** * Injects one element into the trigger for the given window and returns the result of - * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}. */ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java index 2373a86..9e4669f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,17 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; /** - * Tests for {@link TumblingEventTimeWindows} + * Tests for {@link TumblingEventTimeWindows}. */ public class TumblingEventTimeWindowsTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java index 348b6fa..a611fc0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Tests for {@link TumblingProcessingTimeWindows} + * Tests for {@link TumblingProcessingTimeWindows}. */ public class TumblingProcessingTimeWindowsTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 0d80605..8ceda45 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -15,38 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; - -import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +package org.apache.flink.streaming.runtime.operators.windowing; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -64,6 +38,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.flink.util.TestLogger; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -73,6 +48,31 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Base for window operator tests that verify correct interaction with the other windowing * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}, @@ -128,7 +128,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { return mockAssigner; } - static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception { @SuppressWarnings("unchecked") MergingWindowAssigner<T, TimeWindow> mockAssigner = mock(MergingWindowAssigner.class); @@ -139,7 +138,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { return mockAssigner; } - static WindowAssigner.WindowAssignerContext anyAssignerContext() { return Mockito.any(); } @@ -177,7 +175,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { return Mockito.any(); } - static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception { doAnswer(new Answer<TriggerResult>() { @Override @@ -369,7 +366,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { } - @Test public void testAssignerIsInvokedOncePerElement() throws Exception { @@ -540,7 +536,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { testEmittingFromWindowFunction(new ProcessingTimeAdaptor()); } - private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); @@ -1258,7 +1253,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { testTimerFiring(new ProcessingTimeAdaptor()); } - private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); @@ -1382,8 +1376,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback()); verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback()); - - } @Test @@ -2392,7 +2384,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2]; context.windowState().getState(valueStateDescriptor).update("hello"); return null; } @@ -2401,7 +2393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1]; context.windowState().getState(valueStateDescriptor).clear(); return null; } @@ -2441,7 +2433,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2]; context.windowState().getState(valueStateDescriptor).update("hello"); return null; } @@ -2481,7 +2473,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } @@ -2490,7 +2482,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[1]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } @@ -2520,7 +2512,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { long allowedLatenss, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception; - private interface TimeDomainAdaptor { void setIsEventTime(WindowAssigner<?, ?> mockAssigner); @@ -2535,9 +2526,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { int numTimersOtherDomain(AbstractStreamOperatorTestHarness testHarness); - void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception; + void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception; - void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, final long timestamp) throws Exception; + void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception; void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java index 9ec1923..904a8b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; @@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; + import org.junit.Test; import java.net.URL; @@ -63,8 +65,7 @@ import static org.junit.Assert.fail; * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done * using the Flink 1.1 {@link WindowOperator}. * - * <p> - * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1 + * <p>This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1 * aligned processing-time windows operator. * * <p>For regenerating the binary snapshot file you have to run the commented out portion @@ -85,7 +86,7 @@ public class WindowOperatorFrom11MigrationTest { @SuppressWarnings("unchecked") public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -93,7 +94,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -131,7 +132,6 @@ public class WindowOperatorFrom11MigrationTest { testHarness.close(); */ - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -146,7 +146,6 @@ public class WindowOperatorFrom11MigrationTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire @@ -167,7 +166,7 @@ public class WindowOperatorFrom11MigrationTest { @SuppressWarnings("unchecked") public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -175,7 +174,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -244,7 +243,7 @@ public class WindowOperatorFrom11MigrationTest { @Test @SuppressWarnings("unchecked") public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -253,7 +252,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -334,7 +333,7 @@ public class WindowOperatorFrom11MigrationTest { @Test @SuppressWarnings("unchecked") public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -342,7 +341,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -423,7 +422,7 @@ public class WindowOperatorFrom11MigrationTest { @Test @SuppressWarnings("unchecked") public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -432,7 +431,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -500,7 +499,7 @@ public class WindowOperatorFrom11MigrationTest { @Test @SuppressWarnings("unchecked") public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -508,7 +507,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -622,7 +621,7 @@ public class WindowOperatorFrom11MigrationTest { */ - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -631,7 +630,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -725,7 +724,7 @@ public class WindowOperatorFrom11MigrationTest { testHarness.close(); */ - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -734,7 +733,7 @@ public class WindowOperatorFrom11MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -774,7 +773,6 @@ public class WindowOperatorFrom11MigrationTest { testHarness.close(); } - private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 1L; @@ -832,7 +830,7 @@ public class WindowOperatorFrom11MigrationTest { } } - public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, @@ -841,7 +839,7 @@ public class WindowOperatorFrom11MigrationTest { } } - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; @@ -877,7 +875,7 @@ public class WindowOperatorFrom11MigrationTest { } - public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java index 0d3a6dc..6e9db1a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java @@ -15,13 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; -import static org.junit.Assert.fail; +package org.apache.flink.streaming.runtime.operators.windowing; -import java.util.Comparator; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -57,9 +53,16 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; + import org.junit.Ignore; import org.junit.Test; +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; + /** * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done * using the Flink 1.2 {@link WindowOperator}. @@ -78,7 +81,7 @@ public class WindowOperatorFrom12MigrationTest { @Ignore @Test public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -86,7 +89,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -122,7 +125,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreSessionWindowsWithCountTrigger() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -130,7 +133,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -157,7 +160,6 @@ public class WindowOperatorFrom12MigrationTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire @@ -177,7 +179,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -185,7 +187,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -215,7 +217,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception { - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -223,7 +225,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -280,7 +282,7 @@ public class WindowOperatorFrom12MigrationTest { @Ignore @Test public void writeReducingEventTimeWindowsSnapshot() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -289,7 +291,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -336,7 +338,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreReducingEventTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -345,7 +347,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -391,7 +393,7 @@ public class WindowOperatorFrom12MigrationTest { @Ignore @Test public void writeApplyEventTimeWindowsSnapshot() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -399,7 +401,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -446,7 +448,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreApplyEventTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -454,7 +456,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -500,7 +502,7 @@ public class WindowOperatorFrom12MigrationTest { @Ignore @Test public void writeReducingProcessingTimeWindowsSnapshot() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -509,7 +511,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -550,7 +552,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreReducingProcessingTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -559,7 +561,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -600,7 +602,7 @@ public class WindowOperatorFrom12MigrationTest { @Ignore @Test public void writeApplyProcessingTimeWindowsSnapshot() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -608,7 +610,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -648,7 +650,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreApplyProcessingTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -656,7 +658,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -738,7 +740,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -747,7 +749,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -841,7 +843,7 @@ public class WindowOperatorFrom12MigrationTest { @Test public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -850,7 +852,7 @@ public class WindowOperatorFrom12MigrationTest { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -892,7 +894,6 @@ public class WindowOperatorFrom12MigrationTest { testHarness.close(); } - private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 1L; @@ -950,7 +951,7 @@ public class WindowOperatorFrom12MigrationTest { } } - public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, @@ -959,7 +960,7 @@ public class WindowOperatorFrom12MigrationTest { } } - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; @@ -995,7 +996,7 @@ public class WindowOperatorFrom12MigrationTest { } - public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override
