http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java new file mode 100644 index 0000000..050178b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Simple {@link TriggerTestHarness} that accepts integers and takes the value as the timestamp for + * the {@link StreamRecord}. + */ +public class SimpleTriggerTestHarness<W extends Window> extends TriggerTestHarness<Integer, W> { + + public SimpleTriggerTestHarness( + Trigger<Integer, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + super(trigger, windowSerializer); + } + + public TriggerResult processElement(Integer element, W window) throws Exception { + return super.processElement(new StreamRecord<>(element, element), window); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java new file mode 100644 index 0000000..4599d19 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link SlidingEventTimeWindows} + */ +public class SlidingEventTimeWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingEventTimeWindows assigner = + SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000)); + + assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder( + timeWindow(-4000, 1000), + timeWindow(-3000, 2000), + timeWindow(-2000, 3000), + timeWindow(-1000, 4000), + timeWindow(0, 5000))); + + assertThat(assigner.assignWindows("String", 4999L, mockContext), containsInAnyOrder( + timeWindow(0, 5000), + timeWindow(1000, 6000), + timeWindow(2000, 7000), + timeWindow(3000, 8000), + timeWindow(4000, 9000))); + + assertThat(assigner.assignWindows("String", 5000L, mockContext), containsInAnyOrder( + timeWindow(1000, 6000), + timeWindow(2000, 7000), + timeWindow(3000, 8000), + timeWindow(4000, 9000), + timeWindow(5000, 10000))); + } + + @Test + public void testWindowAssignmentWithOffset() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingEventTimeWindows assigner = + SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100)); + + assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder( + timeWindow(-3900, 1100), + timeWindow(-2900, 2100), + timeWindow(-1900, 3100), + timeWindow(-900, 4100), + timeWindow(100, 5100))); + + assertThat(assigner.assignWindows("String", 5099L, mockContext), containsInAnyOrder( + timeWindow(100, 5100), + timeWindow(1100, 6100), + timeWindow(2100, 7100), + timeWindow(3100, 8100), + timeWindow(4100, 9100))); + + assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder( + timeWindow(1100, 6100), + timeWindow(2100, 7100), + timeWindow(3100, 8100), + timeWindow(4100, 9100), + timeWindow(5100, 10100))); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500)); + + assertThat(assigner.assignWindows("String", 100L, mockContext), containsInAnyOrder( + timeWindow(-4500, 500), + timeWindow(-3500, 1500), + timeWindow(-2500, 2500), + timeWindow(-1500, 3500), + timeWindow(-500, 4500))); + + assertThat(assigner.assignWindows("String", 5499L, mockContext), containsInAnyOrder( + timeWindow(500, 5500), + timeWindow(1500, 6500), + timeWindow(2500, 7500), + timeWindow(3500, 8500), + timeWindow(4500, 9500))); + + assertThat(assigner.assignWindows("String", 5100L, mockContext), containsInAnyOrder( + timeWindow(500, 5500), + timeWindow(1500, 6500), + timeWindow(2500, 7500), + timeWindow(3500, 8500), + timeWindow(4500, 9500))); + } + + @Test + public void testInvalidParameters() { + try { + SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + + try { + SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + + + try { + SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + } + + @Test + public void testProperties() { + SlidingEventTimeWindows assigner = SlidingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100)); + + assertTrue(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java new file mode 100644 index 0000000..20a9924 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SlidingProcessingTimeWindows} + */ +public class SlidingProcessingTimeWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingProcessingTimeWindows assigner = + SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(0L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(-4000, 1000), + timeWindow(-3000, 2000), + timeWindow(-2000, 3000), + timeWindow(-1000, 4000), + timeWindow(0, 5000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(4999L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(0, 5000), + timeWindow(1000, 6000), + timeWindow(2000, 7000), + timeWindow(3000, 8000), + timeWindow(4000, 9000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(1000, 6000), + timeWindow(2000, 7000), + timeWindow(3000, 8000), + timeWindow(4000, 9000), + timeWindow(5000, 10000))); + } + + @Test + public void testWindowAssignmentWithOffset() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingProcessingTimeWindows assigner = + SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(100)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(-3900, 1100), + timeWindow(-2900, 2100), + timeWindow(-1900, 3100), + timeWindow(-900, 4100), + timeWindow(100, 5100))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5099L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(100, 5100), + timeWindow(1100, 6100), + timeWindow(2100, 7100), + timeWindow(3100, 8100), + timeWindow(4100, 9100))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(1100, 6100), + timeWindow(2100, 7100), + timeWindow(3100, 8100), + timeWindow(4100, 9100), + timeWindow(5100, 10100))); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1), Time.milliseconds(500)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(-4500, 500), + timeWindow(-3500, 1500), + timeWindow(-2500, 2500), + timeWindow(-1500, 3500), + timeWindow(-500, 4500))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5499L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(500, 5500), + timeWindow(1500, 6500), + timeWindow(2500, 7500), + timeWindow(3500, 8500), + timeWindow(4500, 9500))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder( + timeWindow(500, 5500), + timeWindow(1500, 6500), + timeWindow(2500, 7500), + timeWindow(3500, 8500), + timeWindow(4500, 9500))); + } + + @Test + public void testInvalidParameters() { + try { + SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + + try { + SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + + + try { + SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); + } + } + + @Test + public void testProperties() { + SlidingProcessingTimeWindows assigner = SlidingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100)); + + assertFalse(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java new file mode 100644 index 0000000..bb07996 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matchers that are useful for working with {@link StreamRecord StreamRecords}. This ... + */ +public class StreamRecordMatchers { + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + T value) { + + return isStreamRecord(Matchers.equalTo(value)); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + T value, + long timestamp) { + + return isStreamRecord(Matchers.equalTo(value), Matchers.equalTo(timestamp)); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + Matcher<? super T> valueMatcher) { + return new StreamRecordMatcher<>(valueMatcher, Matchers.anything()); + } + + public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { + return new StreamRecordMatcher<>(valueMatcher, timestampMatcher); + } + + public static Matcher<TimeWindow> timeWindow(long start, long end) { + return Matchers.equalTo(new TimeWindow(start, end)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @SafeVarargs + public static <W extends Window> Matcher<Iterable<W>> ofWindows(Matcher<W>... windows) { + return (Matcher) Matchers.containsInAnyOrder(windows); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value) { + return isWindowedValue(Matchers.equalTo(value)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value, + long timestamp) { + return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp)); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + T value, + long timestamp, + W window) { + return isWindowedValue(Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.anything()); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp, W window) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + } + + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) { + return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> windowMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.equalTo(timestamp), windowMatcher); + } + + public static <T, W extends Window> Matcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> isWindowedValue( + Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher, Matcher<? super W> windowMatcher) { + return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowMatcher); + } + + + private StreamRecordMatchers() {} + + private static class StreamRecordMatcher<T> extends TypeSafeMatcher<StreamRecord<? extends T>> { + + private Matcher<? super T> valueMatcher; + private Matcher<? super Long> timestampMatcher; + + private StreamRecordMatcher( + Matcher<? super T> valueMatcher, + Matcher<? super Long> timestampMatcher) { + this.valueMatcher = valueMatcher; + this.timestampMatcher = timestampMatcher; + } + + @Override + public void describeTo(Description description) { + description + .appendText("a StreamRecordValue(").appendValue(valueMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(")"); + } + + @Override + protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) { + return valueMatcher.matches(streamRecord.getValue()) + && timestampMatcher.matches(streamRecord.getTimestamp()); + } + } + + private static class WindowedValueMatcher<T, W extends Window> extends TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends W>>> { + + private Matcher<? super T> valueMatcher; + private Matcher<? super Long> timestampMatcher; + private Matcher<? super W> windowMatcher; + + + private WindowedValueMatcher( + Matcher<? super T> valueMatcher, + Matcher<? super Long> timestampMatcher, + Matcher<? super W> windowMatcher) { + this.valueMatcher = valueMatcher; + this.timestampMatcher = timestampMatcher; + this.windowMatcher = windowMatcher; + } + + @Override + public void describeTo(Description description) { + description + .appendText("a WindowedValue(").appendValue(valueMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(")"); + } + + @Override + protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? extends T, ? extends W>> streamRecord) { + return valueMatcher.matches(streamRecord.getValue().value()) + && timestampMatcher.matches(streamRecord.getTimestamp()) + && windowMatcher.matches(streamRecord.getValue().window()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java new file mode 100644 index 0000000..b9923f2 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); + + if (!firing.f0.equals(window)) { + throw new IllegalStateException("Trigger fired for another window."); + } + + return firing.f1; + } + + /** + * Advanced the watermark and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceWatermark(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); + + if (!firing.f0.equals(window)) { + throw new IllegalStateException("Trigger fired for another window."); + } + + return firing.f1; + } + + /** + * Advanced processing time and processes any timers that fire because of this. The + * window and {@link TriggerResult} for each firing are returned. + */ + public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long time) throws Exception { + Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers = + internalTimerService.advanceProcessingTime(time); + + Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>(); + + for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + timer.getNamespace(), + internalTimerService, + stateBackend, + windowSerializer); + + TriggerResult triggerResult = + trigger.onProcessingTime(timer.getTimestamp(), timer.getNamespace(), triggerContext); + + result.add(new Tuple2<>(timer.getNamespace(), triggerResult)); + } + + return result; + } + + /** + * Advanced the watermark and processes any timers that fire because of this. The + * window and {@link TriggerResult} for each firing are returned. + */ + public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time) throws Exception { + Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers = + internalTimerService.advanceWatermark(time); + + Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>(); + + for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + timer.getNamespace(), + internalTimerService, + stateBackend, + windowSerializer); + + TriggerResult triggerResult = + trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext); + + result.add(new Tuple2<>(timer.getNamespace(), triggerResult)); + } + + return result; + } + + /** + * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given + * parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the + * merged windows as does {@link WindowOperator}. + */ + public void mergeWindows(W targetWindow, Collection<W> mergedWindows) throws Exception { + TestOnMergeContext<Integer, W> onMergeContext = new TestOnMergeContext<>( + KEY, + targetWindow, + mergedWindows, + internalTimerService, + stateBackend, + windowSerializer); + trigger.onMerge(targetWindow, onMergeContext); + + for (W mergedWindow : mergedWindows) { + clearTriggerState(mergedWindow); + } + } + + /** + * Calls {@link Trigger#clear(Window, Trigger.TriggerContext)} for the given window. + */ + public void clearTriggerState(W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + trigger.clear(window, triggerContext); + } + + private static class TestTriggerContext<K, W extends Window> implements Trigger.TriggerContext { + + protected final InternalTimerService<W> timerService; + protected final KeyedStateBackend<Integer> stateBackend; + protected final K key; + protected final W window; + protected final TypeSerializer<W> windowSerializer; + + TestTriggerContext( + K key, + W window, + InternalTimerService<W> timerService, + KeyedStateBackend<Integer> stateBackend, + TypeSerializer<W> windowSerializer) { + this.key = key; + this.window = window; + this.timerService = timerService; + this.stateBackend = stateBackend; + this.windowSerializer = windowSerializer; + } + + @Override + public long getCurrentProcessingTime() { + return timerService.currentProcessingTime(); + } + + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public long getCurrentWatermark() { + return timerService.currentWatermark(); + } + + @Override + public void registerProcessingTimeTimer(long time) { + timerService.registerProcessingTimeTimer(window, time); + } + + @Override + public void registerEventTimeTimer(long time) { + timerService.registerEventTimeTimer(window, time); + } + + @Override + public void deleteProcessingTimeTimer(long time) { + timerService.deleteProcessingTimeTimer(window, time); + } + + @Override + public void deleteEventTimeTimer(long time) { + timerService.deleteEventTimeTimer(window, time); + } + + @Override + public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) { + try { + return stateBackend.getPartitionedState(window, windowSerializer, stateDescriptor); + } catch (Exception e) { + throw new RuntimeException("Error getting state", e); + } + } + + @Override + public <S extends Serializable> ValueState<S> getKeyValueState( + String name, Class<S> stateType, S defaultState) { + return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState)); + } + + @Override + public <S extends Serializable> ValueState<S> getKeyValueState( + String name, TypeInformation<S> stateType, S defaultState) { + return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState)); + } + } + + private static class TestOnMergeContext<K, W extends Window> extends TestTriggerContext<K, W> implements Trigger.OnMergeContext { + + private final Collection<W> mergedWindows; + + public TestOnMergeContext( + K key, + W targetWindow, + Collection<W> mergedWindows, + InternalTimerService<W> timerService, + KeyedStateBackend<Integer> stateBackend, + TypeSerializer<W> windowSerializer) { + super(key, targetWindow, timerService, stateBackend, windowSerializer); + + this.mergedWindows = mergedWindows; + } + + @Override + public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) { + try { + S rawState = stateBackend.getOrCreateKeyedState(windowSerializer, stateDescriptor); + + if (rawState instanceof InternalMergingState) { + @SuppressWarnings("unchecked") + InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState; + mergingState.mergeNamespaces(window, mergedWindows); + } + else { + throw new IllegalArgumentException( + "The given state descriptor does not refer to a mergeable state (MergingState)"); + } + } + catch (Exception e) { + throw new RuntimeException("Error while merging state.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java new file mode 100644 index 0000000..2373a86 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link TumblingEventTimeWindows} + */ +public class TumblingEventTimeWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000)); + + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 5000))); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(0, 5000))); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 10000))); + } + + @Test + public void testWindowAssignmentWithOffset() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100)); + + assertThat(assigner.assignWindows("String", 100L, mockContext), contains(timeWindow(100, 5100))); + assertThat(assigner.assignWindows("String", 5099L, mockContext), contains(timeWindow(100, 5100))); + assertThat(assigner.assignWindows("String", 5100L, mockContext), contains(timeWindow(5100, 10100))); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)); + + assertThat(assigner.assignWindows("String", 1000L, mockContext), contains(timeWindow(1000, 6000))); + assertThat(assigner.assignWindows("String", 5999L, mockContext), contains(timeWindow(1000, 6000))); + assertThat(assigner.assignWindows("String", 6000L, mockContext), contains(timeWindow(6000, 11000))); + } + + @Test + public void testInvalidParameters() { + try { + TumblingEventTimeWindows.of(Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + + try { + TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + + try { + TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + } + + @Test + public void testProperties() { + TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.seconds(5), Time.milliseconds(100)); + + assertTrue(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java new file mode 100644 index 0000000..348b6fa --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link TumblingProcessingTimeWindows} + */ +public class TumblingProcessingTimeWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(0L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(4999L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000))); + } + + @Test + public void testWindowAssignmentWithOffset() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5099L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5100L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100))); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(1000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5999L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(6000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000))); + } + + @Test + public void testInvalidParameters() { + try { + TumblingProcessingTimeWindows.of(Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + + try { + TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + + try { + TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 <= offset < size")); + } + } + + @Test + public void testProperties() { + TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100)); + + assertFalse(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class)); + } +}
