http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index 2a6a723..d9fcc12 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -37,11 +37,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; - import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.util.CollectorOutput; + import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -52,12 +52,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for {@link StreamSource} operators. + */ @SuppressWarnings("serial") public class StreamSourceOperatorTest { @@ -86,7 +89,6 @@ public class StreamSourceOperatorTest { final StreamSource<String, InfiniteSource<String>> operator = new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); operator.cancel(); @@ -106,7 +108,6 @@ public class StreamSourceOperatorTest { final StreamSource<String, InfiniteSource<String>> operator = new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); // trigger an async cancel in a bit @@ -139,7 +140,6 @@ public class StreamSourceOperatorTest { final StoppableStreamSource<String, InfiniteSource<String>> operator = new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); operator.stop(); @@ -158,7 +158,6 @@ public class StreamSourceOperatorTest { final StoppableStreamSource<String, InfiniteSource<String>> operator = new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); // trigger an async cancel in a bit @@ -179,7 +178,7 @@ public class StreamSourceOperatorTest { } /** - * Test that latency marks are emitted + * Test that latency marks are emitted. */ @Test public void testLatencyMarkEmission() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index 6772db4..6e3be03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -24,15 +24,16 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Test; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. @@ -65,7 +66,6 @@ public class StreamTaskTimerTest { assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount()); - testHarness.endInput(); testHarness.waitForTaskCompletion(); @@ -109,8 +109,7 @@ public class StreamTaskTimerTest { long deadline = System.currentTimeMillis() + 20000; while (errorRef.get() == null && ValidatingProcessingTimeCallback.numInSequence < 4 && - System.currentTimeMillis() < deadline) - { + System.currentTimeMillis() < deadline) { Thread.sleep(100); } @@ -170,6 +169,9 @@ public class StreamTaskTimerTest { // ------------------------------------------------------------------------ + /** + * Identity mapper. + */ public static class DummyMapFunction<T> implements MapFunction<T, T> { @Override public T map(T value) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index 9897884..675ffa3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.junit.Test; @@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link TestProcessingTimeService}. + */ public class TestProcessingTimeServiceTest { @Test @@ -90,6 +93,9 @@ public class TestProcessingTimeServiceTest { // ------------------------------------------------------------------------ + /** + * An {@link AsyncExceptionHandler} storing the handled exception. + */ public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { private final AtomicReference<Throwable> errorReference; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java index f129c20..51af116 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java @@ -28,8 +28,13 @@ import org.junit.Test; import java.util.concurrent.ConcurrentLinkedQueue; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +/** + * Tests for {@link TimestampsAndPeriodicWatermarksOperator}. + */ public class TimestampsAndPeriodicWatermarksOperatorTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java index 0333e93..a422432 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java @@ -30,6 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link TimestampsAndPunctuatedWatermarksOperator}. + */ public class TimestampsAndPunctuatedWatermarksOperatorTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java index 46d92af..d3fd585 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java @@ -27,6 +27,9 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +/** + * Test base for {@link GenericWriteAheadSink}. + */ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger { protected abstract S createSink() throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 2f7e302..a57dcf1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -68,6 +68,9 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for {@link AccumulatingProcessingTimeWindowOperator}. + */ @SuppressWarnings({"serial"}) @PrepareForTest(InternalIterableWindowFunction.class) @RunWith(PowerMockRunner.class) @@ -220,7 +223,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); testHarness.close(); - op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); @@ -278,7 +280,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.setProcessingTime(currentTime); } - List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(numElements, result.size()); @@ -322,7 +323,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.setProcessingTime(currentTime); } - List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(numElements, result.size()); @@ -476,7 +476,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.setProcessingTime(200); - List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(6, result.size()); @@ -524,7 +523,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.setProcessingTime(200); - List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(6, result.size()); @@ -837,7 +835,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.restore(state); testHarness.open(); - // inject again the remaining elements for (int i = numElementsFirst; i < numElements; i++) { testHarness.processElement(new StreamRecord<>(i)); @@ -929,7 +926,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.restore(state); testHarness.open(); - // inject again the remaining elements for (int i = numElementsFirst; i < numElements; i++) { testHarness.processElement(new StreamRecord<>(i)); @@ -1040,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries - static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); + private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); private ValueState<Integer> state; @@ -1053,9 +1049,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Override public void process(Integer key, - Context context, - Iterable<Integer> values, - Collector<Integer> out) throws Exception { + Context context, + Iterable<Integer> values, + Collector<Integer> out) throws Exception { for (Integer i : values) { // we need to update this state before emitting elements. Else, the test's main // thread will have received all output elements before the state is updated and @@ -1093,8 +1089,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } private static StreamTask<?, ?> createMockTaskWithTimer( - final ProcessingTimeService timerService) - { + final ProcessingTimeService timerService) { StreamTask<?, ?> mockTask = createMockTask(); when(mockTask.getProcessingTimeService()).thenReturn(timerService); return mockTask; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 1875bbb..62f4f0b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -35,9 +35,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; - import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + import org.junit.After; import org.junit.Test; @@ -56,6 +56,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +/** + * Tests for aligned {@link AggregatingProcessingTimeWindowOperator}. + */ @SuppressWarnings("serial") public class AggregatingAlignedProcessingTimeWindowOperatorTest { @@ -66,9 +69,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = - new KeySelector<Tuple2<Integer,Integer>, Integer>() { + new KeySelector<Tuple2<Integer, Integer>, Integer>() { @Override - public Integer getKey(Tuple2<Integer,Integer> value) { + public Integer getKey(Tuple2<Integer, Integer> value) { return value.f0; } }; @@ -585,7 +588,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(op); @@ -824,7 +826,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> { - static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); + private static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); private ValueState<Integer> state; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 81a9275..a7c6f47 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; @@ -1244,7 +1245,6 @@ public class AllWindowTranslationTest { } - @Test @SuppressWarnings("rawtypes") public void testReduceWithCustomTrigger() throws Exception { @@ -1461,7 +1461,7 @@ public class AllWindowTranslationTest { // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java index 0f65a88..9c14a9f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Lists; import org.junit.Test; import java.util.Collection; @@ -62,7 +64,7 @@ public class ContinuousEventTimeTriggerTest { /** - * Verify that state <TimeWindow>of separate windows does not leak into other windows. + * Verify that state <TimeWindow>of separate windows does not leak into other windows. */ @Test public void testWindowSeparationAndFiring() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java index 16e353b..38dd01d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Lists; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java index a46572b..23af838 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java @@ -15,10 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; @@ -28,6 +27,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; import org.junit.Test; import org.mockito.Matchers; @@ -38,27 +39,35 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyCollection; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** - * Tests for {@link EventTimeSessionWindows} + * Tests for {@link EventTimeSessionWindows}. */ public class EventTimeSessionWindowsTest extends TestLogger { @Test public void testWindowAssignment() { - final int SESSION_GAP = 5000; + final int sessionGap = 5000; WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class); - EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP)); + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(sessionGap)); - assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP))); - assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP))); - assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap))); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap))); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap))); } @Test @@ -138,16 +147,16 @@ public class EventTimeSessionWindowsTest extends TestLogger { public void testTimeUnits() { // sanity check with one other time unit - final int SESSION_GAP = 5000; + final int sessionGap = 5000; WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class); - EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000)); + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(sessionGap / 1000)); - assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP))); - assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP))); - assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + sessionGap))); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + sessionGap))); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + sessionGap))); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java index 2d93ac0..2bcc192 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Lists; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java index 7af4506..a89aec0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index e5d3ef0..8d65bb4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; @@ -50,6 +51,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; @@ -58,18 +60,21 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +/** + * Tests for {@link EvictingWindowOperator}. + */ public class EvictingWindowOperatorTest { /** - * Tests CountEvictor evictAfter behavior + * Tests CountEvictor evictAfter behavior. * @throws Exception */ @Test public void testCountEvictorEvictAfter() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int WINDOW_SIZE = 4; - final int TRIGGER_COUNT = 2; - final boolean EVICT_AFTER = true; + final int windowSize = 4; + final int triggerCount = 2; + final boolean evictAfter = true; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -80,7 +85,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -88,16 +92,14 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), - CountEvictor.of(WINDOW_SIZE,EVICT_AFTER), + CountTrigger.of(triggerCount), + CountEvictor.of(windowSize, evictAfter), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - long initialTime = 0L; ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -114,8 +116,6 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); @@ -139,18 +139,17 @@ public class EvictingWindowOperatorTest { testHarness.close(); Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } /** - * Tests TimeEvictor evictAfter behavior + * Tests TimeEvictor evictAfter behavior. * @throws Exception */ @Test public void testTimeEvictorEvictAfter() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int TRIGGER_COUNT = 2; - final boolean EVICT_AFTER = true; + final int triggerCount = 2; + final boolean evictAfter = true; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -161,7 +160,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -169,12 +167,11 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), - TimeEvictor.of(Time.seconds(2), EVICT_AFTER), + CountTrigger.of(triggerCount), + TimeEvictor.of(Time.seconds(2), evictAfter), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -194,13 +191,10 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001)); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); @@ -218,14 +212,14 @@ public class EvictingWindowOperatorTest { } /** - * Tests TimeEvictor evictBefore behavior + * Tests TimeEvictor evictBefore behavior. * @throws Exception */ @Test public void testTimeEvictorEvictBefore() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int TRIGGER_COUNT = 2; - final int WINDOW_SIZE = 4; + final int triggerCount = 2; + final int windowSize = 4; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -236,20 +230,18 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), + CountTrigger.of(triggerCount), TimeEvictor.of(Time.seconds(2)), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -270,13 +262,10 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001)); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 3999)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6500)); @@ -290,19 +279,18 @@ public class EvictingWindowOperatorTest { testHarness.close(); Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } /** - * Tests time evictor, if no timestamp information in the StreamRecord - * No element will be evicted from the window + * Tests time evictor, if no timestamp information in the StreamRecord. + * No element will be evicted from the window. * @throws Exception */ @Test public void testTimeEvictorNoTimestamp() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int TRIGGER_COUNT = 2; - final boolean EVICT_AFTER = true; + final int triggerCount = 2; + final boolean evictAfter = true; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -313,7 +301,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -321,12 +308,11 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), - TimeEvictor.of(Time.seconds(2), EVICT_AFTER), + CountTrigger.of(triggerCount), + TimeEvictor.of(Time.seconds(2), evictAfter), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -345,13 +331,10 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); @@ -365,19 +348,18 @@ public class EvictingWindowOperatorTest { testHarness.close(); Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } /** - * Tests DeltaEvictor, evictBefore behavior + * Tests DeltaEvictor, evictBefore behavior. * @throws Exception */ @Test public void testDeltaEvictorEvictBefore() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int TRIGGER_COUNT = 2; - final boolean EVICT_AFTER = false; - final int THRESHOLD = 2; + final int triggerCount = 2; + final boolean evictAfter = false; + final int threshold = 2; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -388,7 +370,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -396,18 +377,16 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), - DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() { + CountTrigger.of(triggerCount), + DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() { @Override public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) { return newDataPoint.f1 - oldDataPoint.f1; } - }, EVICT_AFTER), + }, evictAfter), 0, null /* late data output tag */); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -447,15 +426,15 @@ public class EvictingWindowOperatorTest { } /** - * Tests DeltaEvictor, evictAfter behavior + * Tests DeltaEvictor, evictAfter behavior. * @throws Exception */ @Test public void testDeltaEvictorEvictAfter() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int TRIGGER_COUNT = 2; - final boolean EVICT_AFTER = true; - final int THRESHOLD = 2; + final int triggerCount = 2; + final boolean evictAfter = true; + final int threshold = 2; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -466,7 +445,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -474,18 +452,16 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(TRIGGER_COUNT), - DeltaEvictor.of(THRESHOLD, new DeltaFunction<Tuple2<String, Integer>>() { + CountTrigger.of(triggerCount), + DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() { @Override public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) { return newDataPoint.f1 - oldDataPoint.f1; } - }, EVICT_AFTER), + }, evictAfter), 0, null /* late data output tag */); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -528,8 +504,8 @@ public class EvictingWindowOperatorTest { @SuppressWarnings("unchecked") public void testCountTrigger() throws Exception { - final int WINDOW_SIZE = 4; - final int WINDOW_SLIDE = 2; + final int windowSize = 4; + final int windowSlide = 2; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -540,7 +516,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -552,12 +527,11 @@ public class EvictingWindowOperatorTest { new SumReducer(), // on some versions of Java we seem to need the explicit type new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())), - CountTrigger.of(WINDOW_SLIDE), - CountEvictor.of(WINDOW_SIZE), + CountTrigger.of(windowSlide), + CountEvictor.of(windowSize), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -580,8 +554,6 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); @@ -604,12 +576,11 @@ public class EvictingWindowOperatorTest { public void testCountTriggerWithApply() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int WINDOW_SIZE = 4; - final int WINDOW_SLIDE = 2; + final int windowSize = 4; + final int windowSlide = 2; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); @@ -617,7 +588,6 @@ public class EvictingWindowOperatorTest { ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), @@ -625,12 +595,11 @@ public class EvictingWindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), - CountTrigger.of(WINDOW_SLIDE), - CountEvictor.of(WINDOW_SIZE), + CountTrigger.of(windowSlide), + CountEvictor.of(windowSize), 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -653,8 +622,6 @@ public class EvictingWindowOperatorTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); @@ -679,7 +646,7 @@ public class EvictingWindowOperatorTest { public void testTumblingWindowWithApply() throws Exception { AtomicInteger closeCalled = new AtomicInteger(0); - final int WINDOW_SIZE = 4; + final int windowSize = 4; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -691,14 +658,14 @@ public class EvictingWindowOperatorTest { new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)), EventTimeTrigger.create(), - CountEvictor.of(WINDOW_SIZE), + CountEvictor.of(windowSize), 0, null /* late data output tag */); @@ -731,7 +698,6 @@ public class EvictingWindowOperatorTest { expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999)); expectedOutput.add(new Watermark(3999)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new EvictingWindowOperatorTest.ResultSortComparator()); testHarness.close(); @@ -741,10 +707,9 @@ public class EvictingWindowOperatorTest { // UDFs // ------------------------------------------------------------------------ - public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; - @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { @@ -752,7 +717,7 @@ public class EvictingWindowOperatorTest { } } - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java index 37fad7e..9292bfb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -24,15 +24,18 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; /** - * Tests for {@link GlobalWindows} + * Tests for {@link GlobalWindows}. */ public class GlobalWindowsTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java index 8786c4e..8eb26ee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link KeyMap}. + */ public class KeyMapPutIfAbsentTest { @Test @@ -38,7 +41,7 @@ public class KeyMapPutIfAbsentTest { factory.set(2 * i + 1); map.putIfAbsent(i, factory); - assertEquals(i+1, map.size()); + assertEquals(i + 1, map.size()); assertTrue(map.getCurrentTableCapacity() > map.size()); assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold()); assertTrue(map.size() <= map.getRehashThreshold()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java index 5b59bea..522c691 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java @@ -29,6 +29,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link KeyMap}. + */ public class KeyMapPutTest { @Test @@ -41,7 +44,7 @@ public class KeyMapPutTest { for (int i = 0; i < numElements; i++) { map.put(i, 2 * i + 1); - assertEquals(i+1, map.size()); + assertEquals(i + 1, map.size()); assertTrue(map.getCurrentTableCapacity() > map.size()); assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold()); assertTrue(map.size() <= map.getRehashThreshold()); @@ -72,7 +75,6 @@ public class KeyMapPutTest { assertEquals(numElements, numContained); assertEquals(numElements, bitset.cardinality()); - assertEquals(numElements, map.size()); assertEquals(numElements, map.traverseAndCountElements()); assertEquals(1 << 21, map.getCurrentTableCapacity()); @@ -91,18 +93,18 @@ public class KeyMapPutTest { final int numElements = 1000000; for (int i = 0; i < numElements; i++) { - Integer put = map.put(i, 2*i+1); + Integer put = map.put(i, 2 * i + 1); assertNull(put); } for (int i = 0; i < numElements; i += 3) { - Integer put = map.put(i, 2*i); + Integer put = map.put(i, 2 * i); assertNotNull(put); - assertEquals(2*i+1, put.intValue()); + assertEquals(2 * i + 1, put.intValue()); } for (int i = 0; i < numElements; i++) { - int expected = (i % 3 == 0) ? (2*i) : (2*i+1); + int expected = (i % 3 == 0) ? (2 * i) : (2 * i + 1); assertEquals(expected, map.get(i).intValue()); } @@ -111,14 +113,13 @@ public class KeyMapPutTest { assertEquals(1 << 21, map.getCurrentTableCapacity()); assertTrue(map.getLongestChainLength() <= 7); - BitSet bitset = new BitSet(); int numContained = 0; for (KeyMap.Entry<Integer, Integer> entry : map) { numContained++; int key = entry.getKey(); - int expected = key % 3 == 0 ? (2*key) : (2*key+1); + int expected = key % 3 == 0 ? (2 * key) : (2 * key + 1); assertEquals(expected, entry.getValue().intValue()); assertFalse(bitset.get(key)); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java index c7fb108..a442466 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java @@ -24,8 +24,15 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Random; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link KeyMap}. + */ public class KeyMapTest { @Test @@ -218,7 +225,6 @@ public class KeyMapTest { nextKeyValue += keyRnd.nextInt(maxStride) + 1; } - // check that all maps contain the total number of elements int numContained = 0; for (KeyMap<?, ?> map : maps) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index aa9cb91..0c45d03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -25,11 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import com.google.common.collect.Lists; import org.junit.Test; import org.mockito.Matchers; @@ -44,9 +45,16 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations @@ -152,7 +160,6 @@ public class MergingWindowSetTest { assertEquals(new TimeWindow(11, 14), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction)); assertFalse(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6))); assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(11, 14))); @@ -179,7 +186,6 @@ public class MergingWindowSetTest { assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(11, 14), mergeFunction)); assertFalse(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new TimeWindow(0, 6))); assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new TimeWindow(10, 15))); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java index 461b5fc..ceda3b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java @@ -15,19 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; import org.junit.Test; import org.mockito.Matchers; @@ -36,13 +37,23 @@ import java.util.Collection; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyCollection; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** - * Tests for {@link ProcessingTimeSessionWindows} + * Tests for {@link ProcessingTimeSessionWindows}. */ public class ProcessingTimeSessionWindowsTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java index a0c2347..791eb42 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.Lists; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Lists; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java index 4302d4d..7139a44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -22,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; @@ -35,7 +37,12 @@ import static org.apache.flink.streaming.runtime.operators.windowing.WindowOpera import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link PurgingTrigger}. @@ -51,7 +58,7 @@ public class PurgingTriggerTest { for (Method triggerMethod : Trigger.class.getDeclaredMethods()) { // try retrieving the method, this will throw an exception if we can't find it - PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes()); + PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(), triggerMethod.getParameterTypes()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java index ff1cbdf..62c484d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java @@ -15,19 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +package org.apache.flink.streaming.runtime.operators.windowing; -import java.util.Arrays; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -50,11 +40,24 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.OutputTag; + import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * These tests verify that {@link WindowOperator} correctly interacts with the other windowing * components: {@link WindowAssigner}, @@ -72,7 +75,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); - ReducingStateDescriptor<Integer> intReduceSumDescriptor = new ReducingStateDescriptor<>( "int-reduce", @@ -89,7 +91,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes final ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE); - KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); @@ -252,7 +253,6 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes ListStateDescriptor<Integer> intListDescriptor = new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); - @SuppressWarnings("unchecked") WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>( assigner, http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java index 050178b..33f4747 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.typeutils.TypeSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java index 4599d19..95a8314 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,18 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; /** - * Tests for {@link SlidingEventTimeWindows} + * Tests for {@link SlidingEventTimeWindows}. */ public class SlidingEventTimeWindowsTest extends TestLogger { @@ -148,7 +151,6 @@ public class SlidingEventTimeWindowsTest extends TestLogger { assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); } - try { SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1)); fail("should fail"); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java index 20a9924..69b628a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Tests for {@link SlidingProcessingTimeWindows} + * Tests for {@link SlidingProcessingTimeWindows}. */ public class SlidingProcessingTimeWindowsTest extends TestLogger { @@ -157,7 +161,6 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger { assertThat(e.toString(), containsString("0 <= offset < slide and size > 0")); } - try { SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1)); fail("should fail");
