http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 8db7a7f..42c6c6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -15,10 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -26,7 +25,6 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.util.OutputTag; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -67,7 +65,11 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.TestLogger; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import org.junit.Assert; import org.junit.Test; @@ -83,6 +85,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link WindowOperator}. + */ @SuppressWarnings("serial") public class WindowOperatorTest extends TestLogger { @@ -108,13 +113,11 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); - testHarness.processWatermark(new Watermark(999)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 999)); expectedOutput.add(new Watermark(999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - testHarness.processWatermark(new Watermark(1999)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 1999)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999)); @@ -149,7 +152,6 @@ public class WindowOperatorTest extends TestLogger { expectedOutput.add(new Watermark(5999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - // those don't have any effect... testHarness.processWatermark(new Watermark(6999)); testHarness.processWatermark(new Watermark(7999)); @@ -164,8 +166,8 @@ public class WindowOperatorTest extends TestLogger { public void testSlidingEventTimeWindowsReduce() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3; - final int WINDOW_SLIDE = 1; + final int windowSize = 3; + final int windowSlide = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -174,7 +176,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + SlidingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -200,8 +202,8 @@ public class WindowOperatorTest extends TestLogger { public void testSlidingEventTimeWindowsApply() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3; - final int WINDOW_SLIDE = 1; + final int windowSize = 3; + final int windowSlide = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -209,7 +211,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + SlidingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -249,12 +251,10 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); - testHarness.processWatermark(new Watermark(999)); expectedOutput.add(new Watermark(999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - testHarness.processWatermark(new Watermark(1999)); expectedOutput.add(new Watermark(1999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); @@ -285,7 +285,6 @@ public class WindowOperatorTest extends TestLogger { expectedOutput.add(new Watermark(5999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - // those don't have any effect... testHarness.processWatermark(new Watermark(6999)); testHarness.processWatermark(new Watermark(7999)); @@ -300,7 +299,7 @@ public class WindowOperatorTest extends TestLogger { public void testTumblingEventTimeWindowsReduce() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -309,7 +308,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -334,7 +333,7 @@ public class WindowOperatorTest extends TestLogger { public void testTumblingEventTimeWindowsApply() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -342,7 +341,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -370,7 +369,7 @@ public class WindowOperatorTest extends TestLogger { public void testSessionWindows() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -378,7 +377,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -443,7 +442,7 @@ public class WindowOperatorTest extends TestLogger { public void testSessionWindowsWithProcessFunction() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -451,7 +450,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -516,7 +515,7 @@ public class WindowOperatorTest extends TestLogger { public void testReduceSessionWindows() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -524,7 +523,7 @@ public class WindowOperatorTest extends TestLogger { "window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -587,7 +586,7 @@ public class WindowOperatorTest extends TestLogger { public void testReduceSessionWindowsWithProcessFunction() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -595,7 +594,7 @@ public class WindowOperatorTest extends TestLogger { "window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -662,7 +661,7 @@ public class WindowOperatorTest extends TestLogger { public void testSessionWindowsWithCountTrigger() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -670,7 +669,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -709,7 +708,6 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); - expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); @@ -733,7 +731,7 @@ public class WindowOperatorTest extends TestLogger { public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception { closeCalled.set(0); - final int SESSION_SIZE = 3; + final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -741,7 +739,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -864,7 +862,7 @@ public class WindowOperatorTest extends TestLogger { public void testContinuousWatermarkTrigger() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -879,7 +877,7 @@ public class WindowOperatorTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()), - ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + ContinuousEventTimeTrigger.of(Time.of(windowSize, TimeUnit.SECONDS)), 0, null /* late data output tag */); @@ -905,12 +903,10 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); - testHarness.processWatermark(new Watermark(1000)); expectedOutput.add(new Watermark(1000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - testHarness.processWatermark(new Watermark(2000)); expectedOutput.add(new Watermark(2000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); @@ -930,13 +926,12 @@ public class WindowOperatorTest extends TestLogger { testHarness.processWatermark(new Watermark(6000)); - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE)); expectedOutput.add(new Watermark(6000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - // those don't have any effect... testHarness.processWatermark(new Watermark(7000)); testHarness.processWatermark(new Watermark(8000)); @@ -953,7 +948,7 @@ public class WindowOperatorTest extends TestLogger { public void testCountTrigger() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 4; + final int windowSize = 4; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -968,7 +963,7 @@ public class WindowOperatorTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()), - PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)), + PurgingTrigger.of(CountTrigger.of(windowSize)), 0, null /* late data output tag */); @@ -1010,7 +1005,7 @@ public class WindowOperatorTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()), - PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)), + PurgingTrigger.of(CountTrigger.of(windowSize)), 0, null /* late data output tag */); @@ -1042,7 +1037,7 @@ public class WindowOperatorTest extends TestLogger { @Test public void testProcessingTimeTumblingWindows() throws Throwable { - final int WINDOW_SIZE = 3; + final int windowSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1051,7 +1046,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -1100,8 +1095,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testProcessingTimeSlidingWindows() throws Throwable { - final int WINDOW_SIZE = 3; - final int WINDOW_SLIDE = 1; + final int windowSize = 3; + final int windowSlide = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1110,7 +1105,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - SlidingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + SlidingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -1173,7 +1168,7 @@ public class WindowOperatorTest extends TestLogger { @Test public void testProcessingTimeSessionWindows() throws Throwable { - final int WINDOW_GAP = 3; + final int windowGap = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1182,7 +1177,7 @@ public class WindowOperatorTest extends TestLogger { inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - ProcessingTimeSessionWindows.withGap(Time.of(WINDOW_GAP, TimeUnit.SECONDS)), + ProcessingTimeSessionWindows.withGap(Time.of(windowGap, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), @@ -1201,10 +1196,10 @@ public class WindowOperatorTest extends TestLogger { // timestamp is ignored in processing time testHarness.setProcessingTime(3); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1));//Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1)); //Long.MAX_VALUE)); testHarness.setProcessingTime(1000); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002)); //Long.MAX_VALUE)); testHarness.setProcessingTime(5000); @@ -1237,8 +1232,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testLateness() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 500; + final int windowSize = 2; + final long lateness = 500; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1248,14 +1243,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS, + lateness, lateOutputTag); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -1293,7 +1288,7 @@ public class WindowOperatorTest extends TestLogger { expected.add(new Watermark(7000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); - + TestHarnessUtil.assertOutputEqualsSorted( "SideOutput was not correct.", lateExpected, @@ -1305,8 +1300,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimeOverflow() throws Exception { - final int WINDOW_SIZE = 1000; - final long LATENESS = 2000; + final int windowSize = 1000; + final long lateness = 2000; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1314,7 +1309,7 @@ public class WindowOperatorTest extends TestLogger { new SumReducer(), inputType.createSerializer(new ExecutionConfig())); - TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); + TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(windowSize)); final WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( @@ -1325,7 +1320,7 @@ public class WindowOperatorTest extends TestLogger { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -1347,10 +1342,10 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), timestamp)); // the garbage collection timer would wrap-around - Assert.assertTrue(window.maxTimestamp() + LATENESS < window.maxTimestamp()); + Assert.assertTrue(window.maxTimestamp() + lateness < window.maxTimestamp()); // and it would prematurely fire with watermark (Long.MAX_VALUE - 1500) - Assert.assertTrue(window.maxTimestamp() + LATENESS < Long.MAX_VALUE - 1500); + Assert.assertTrue(window.maxTimestamp() + lateness < Long.MAX_VALUE - 1500); // if we don't correctly prevent wrap-around in the garbage collection // timers this watermark will clean our window state for the just-added @@ -1374,8 +1369,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testSideOutputDueToLatenessTumbling() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 0; + final int windowSize = 2; + final long lateness = 0; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1385,14 +1380,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create(), - LATENESS, + lateness, lateOutputTag); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -1438,9 +1433,9 @@ public class WindowOperatorTest extends TestLogger { @Test public void testSideOutputDueToLatenessSliding() throws Exception { - final int WINDOW_SIZE = 3; - final int WINDOW_SLIDE = 1; - final long LATENESS = 0; + final int windowSize = 3; + final int windowSlide = 1; + final long lateness = 0; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1450,14 +1445,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + SlidingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create(), - LATENESS, + lateness, lateOutputTag /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -1482,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001)); - // lateness is set to 0 and window_size = 3 sec and slide 1, the following 2 elements (2400) + // lateness is set to 0 and window size = 3 sec and slide 1, the following 2 elements (2400) // are assigned to windows ending at 2999, 3999, 4999. // The 2999 is dropped because it is already late (WM = 2999) but the rest are kept. @@ -1518,8 +1513,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 0; + final int gapSize = 3; + final long lateness = 0; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1529,14 +1524,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS, + lateness, lateOutputTag); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -1610,8 +1605,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testSideOutputDueToLatenessSessionZeroLateness() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 0; + final int gapSize = 3; + final long lateness = 0; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1621,14 +1616,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, lateOutputTag); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -1700,8 +1695,8 @@ public class WindowOperatorTest extends TestLogger { // this has the same output as testSideOutputDueToLatenessSessionZeroLateness() because // the allowed lateness is too small to make a difference - final int GAP_SIZE = 3; - final long LATENESS = 10; + final int gapSize = 3; + final long lateness = 10; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1711,14 +1706,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS, + lateness, lateOutputTag); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -1787,8 +1782,8 @@ public class WindowOperatorTest extends TestLogger { // one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected // results are therefore slightly different. - final int GAP_SIZE = 3; - final long LATENESS = 10; + final int gapSize = 3; + final long lateness = 10; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1798,14 +1793,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -1888,8 +1883,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 10000; + final int gapSize = 3; + final long lateness = 10000; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1899,14 +1894,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -1979,8 +1974,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 10000; + final int gapSize = 3; + final long lateness = 10000; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -1990,14 +1985,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -2072,8 +2067,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 100; + final int windowSize = 2; + final long lateness = 100; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2082,14 +2077,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction2()), - new EventTimeTriggerAccumGC(LATENESS), - LATENESS, + new EventTimeTriggerAccumGC(lateness), + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness = @@ -2127,8 +2122,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 1; + final int windowSize = 2; + final long lateness = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2137,14 +2132,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -2173,8 +2168,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 1; + final int windowSize = 2; + final long lateness = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2184,14 +2179,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -2220,8 +2215,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception { - final int WINDOW_SIZE = 2; - final long LATENESS = 1; + final int windowSize = 2; + final long lateness = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2242,14 +2237,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalSingleValueWindowFunction<>(new PassThroughFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -2278,8 +2273,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 10; + final int gapSize = 3; + final long lateness = 10; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2288,14 +2283,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -2322,8 +2317,8 @@ public class WindowOperatorTest extends TestLogger { @Test public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 10; + final int gapSize = 3; + final long lateness = 10; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2333,14 +2328,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = @@ -2367,8 +2362,8 @@ public class WindowOperatorTest extends TestLogger { // TODO this test seems invalid, as it uses the unsupported combination of merging windows and folding window state @Test public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception { - final int GAP_SIZE = 3; - final long LATENESS = 10; + final int gapSize = 3; + final long lateness = 10; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); @@ -2389,14 +2384,14 @@ public class WindowOperatorTest extends TestLogger { WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + EventTimeSessionWindows.withGap(Time.seconds(gapSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalSingleValueWindowFunction<>(new PassThroughFunction()), EventTimeTrigger.create(), - LATENESS, + lateness, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = @@ -2435,7 +2430,7 @@ public class WindowOperatorTest extends TestLogger { } } - public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, @@ -2444,8 +2439,7 @@ public class WindowOperatorTest extends TestLogger { } } - - public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; @@ -2539,7 +2533,7 @@ public class WindowOperatorTest extends TestLogger { } } - public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2556,7 +2550,7 @@ public class WindowOperatorTest extends TestLogger { } } - public static class ReducedSessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class ReducedSessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2570,7 +2564,7 @@ public class WindowOperatorTest extends TestLogger { } } - public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2588,7 +2582,7 @@ public class WindowOperatorTest extends TestLogger { } } - public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2603,10 +2597,9 @@ public class WindowOperatorTest extends TestLogger { } } - public static class PointSessionWindows extends EventTimeSessionWindows { + private static class PointSessionWindows extends EventTimeSessionWindows { private static final long serialVersionUID = 1L; - private PointSessionWindows(long sessionTimeout) { super(sessionTimeout); } @@ -2629,7 +2622,7 @@ public class WindowOperatorTest extends TestLogger { * purge the state of the fired window. This is to test the state * garbage collection mechanism. */ - public static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> { + private static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private long cleanupTime; @@ -2672,8 +2665,7 @@ public class WindowOperatorTest extends TestLogger { } @Override - public void onMerge(TimeWindow window, - OnMergeContext ctx) { + public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); }
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 5071c37..ced27b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; @@ -124,7 +125,7 @@ public class WindowTranslationTest { source .keyBy(0) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .aggregate(new DummyRichAggregationFunction<Tuple2<String,Integer>>()); + .aggregate(new DummyRichAggregationFunction<Tuple2<String, Integer>>()); fail("exception was not thrown"); } @@ -998,14 +999,14 @@ public class WindowTranslationTest { Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple3<String, String, Integer>> out) throws Exception { for (Tuple3<String, String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + out.collect(new Tuple3<>(in.f0, in.f1, in.f2)); } } }); OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); @@ -1036,14 +1037,14 @@ public class WindowTranslationTest { Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple3<String, String, Integer>> out) throws Exception { for (Tuple3<String, String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + out.collect(new Tuple3<>(in.f0, in.f1, in.f2)); } } }); OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); @@ -1473,7 +1474,6 @@ public class WindowTranslationTest { winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } - @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testFoldWithEvictor() throws Exception { @@ -1665,7 +1665,7 @@ public class WindowTranslationTest { // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { @@ -1758,7 +1758,6 @@ public class WindowTranslationTest { } } - private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java index 449d54b..8be5456 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -24,24 +25,24 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * a {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction}. */ public class WindowedValue<T, W extends Window> { - private final T value; - private final W window; + private final T value; + private final W window; - public WindowedValue(T value, W window) { - this.value = value; - this.window = window; - } + public WindowedValue(T value, W window) { + this.value = value; + this.window = window; + } - public T value() { - return value; - } + public T value() { + return value; + } - public W window() { - return window; - } + public W window() { + return window; + } - @Override - public String toString() { - return "WindowedValue(" + value + ", " + window + ")"; - } + @Override + public String toString() { + return "WindowedValue(" + value + ", " + window + ")"; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java index ddfb9e7..3077005 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java @@ -17,14 +17,18 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.assertArrayEquals; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; + +/** + * Tests for {@link BroadcastPartitioner}. + */ public class BroadcastPartitionerTest { private BroadcastPartitioner<Tuple> broadcastPartitioner1; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java index f7bd739..2ecf17b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java @@ -17,14 +17,18 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.*; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ForwardPartitioner}. + */ public class ForwardPartitionerTest { private ForwardPartitioner<Tuple> forwardPartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java index 6ae3730..5d023c8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java @@ -17,14 +17,18 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.assertArrayEquals; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; + +/** + * Tests for {@link GlobalPartitioner}. + */ public class GlobalPartitionerTest { private GlobalPartitioner<Tuple> globalPartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java index 4ca7449..a57e6f4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java @@ -17,17 +17,21 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link KeyGroupStreamPartitioner}. + */ public class KeyGroupStreamPartitionerTest extends TestLogger { private KeyGroupStreamPartitioner<Tuple2<String, Integer>, String> keyGroupPartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java index 06a1acd..85410f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java @@ -17,14 +17,18 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.*; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link RebalancePartitioner}. + */ public class RebalancePartitionerTest { private RebalancePartitioner<Tuple> distributePartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index 4019d63..309f24d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -56,8 +56,13 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; +/** + * Tests for {@link RescalePartitioner}. + */ @SuppressWarnings("serial") public class RescalePartitionerTest extends TestLogger { @@ -161,10 +166,9 @@ public class RescalePartitionerTest extends TestLogger { fail("Building ExecutionGraph failed: " + e.getMessage()); } - ExecutionJobVertex execSourceVertex = eg.getJobVertex(sourceVertex.getID()); - ExecutionJobVertex execMapVertex= eg.getJobVertex(mapVertex.getID()); - ExecutionJobVertex execSinkVertex= eg.getJobVertex(sinkVertex.getID()); + ExecutionJobVertex execMapVertex = eg.getJobVertex(mapVertex.getID()); + ExecutionJobVertex execSinkVertex = eg.getJobVertex(sinkVertex.getID()); assertEquals(0, execSourceVertex.getInputs().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java index aff177c..238ec4b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java @@ -17,15 +17,19 @@ package org.apache.flink.streaming.runtime.partitioner; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link ShufflePartitioner}. + */ public class ShufflePartitionerTest { private ShufflePartitioner<Tuple> shufflePartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java index 2012c94..79b2b75 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java @@ -32,10 +32,12 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for {@link StreamElementSerializer}. + */ public class StreamElementSerializerTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java index 08d9644..a869e70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java @@ -20,8 +20,15 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.junit.Test; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamRecord}. + */ public class StreamRecordTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java index 564901f..4c945fe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.streamstatus; import org.apache.flink.streaming.api.watermark.Watermark; + import org.junit.Test; import java.util.concurrent.BlockingQueue; @@ -34,8 +35,7 @@ import static org.junit.Assert.assertTrue; * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases. * - * <p> - * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call, + * <p>The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call, * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of * input calls to the valve is trying to test is explained as inline comments within the tests. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java index 247dc8b..152fc6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java @@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.streamstatus; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link StreamStatus}. + */ public class StreamStatusTest { @Test (expected = IllegalArgumentException.class) http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index bf5be79..51328ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -67,6 +67,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamFilter; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.io.IOException; @@ -80,8 +81,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * This test checks that task checkpoints that block and do not react to thread interrupts - * are + * This test checks that task checkpoints that block and do not react to thread interrupts. */ public class BlockingCheckpointsTest { @@ -281,10 +281,9 @@ public class BlockingCheckpointsTest { } } - // ------------------------------------------------------------------------ - // stream task that simply triggers a checkpoint - // ------------------------------------------------------------------------ - + /** + * Stream task that simply triggers a checkpoint. + */ public static final class TestStreamTask extends OneInputStreamTask<Object, Object> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 4435247..542e88a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -68,6 +68,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.io.EOFException; @@ -92,7 +93,7 @@ import static org.mockito.Mockito.when; * This test checks that task restores that get stuck in the presence of interrupts * are handled properly. * - * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks + * <p>In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks * or livelocks if it is interrupted. */ public class InterruptSensitiveRestoreTest { @@ -185,7 +186,6 @@ public class InterruptSensitiveRestoreTest { when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - ChainedStateHandle<StreamStateHandle> operatorState = null; List<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList(); List<KeyedStateHandle> keyedStateFromStream = Collections.emptyList(); @@ -197,7 +197,7 @@ public class InterruptSensitiveRestoreTest { new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); operatorStateMetadata.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, metaInfo); - KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(new KeyGroupRange(0,0)); + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(new KeyGroupRange(0, 0)); Collection<OperatorStateHandle> operatorStateHandles = Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); @@ -381,7 +381,6 @@ public class InterruptSensitiveRestoreTest { @Override public void cancel() {} - @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { fail("should never be called"); @@ -389,7 +388,7 @@ public class InterruptSensitiveRestoreTest { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - ((StateInitializationContext)context).getRawOperatorStateInputs().iterator().next().getStream().read(); + ((StateInitializationContext) context).getRawOperatorStateInputs().iterator().next().getStream().read(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 90f5619..d343eaf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -47,8 +47,8 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -60,12 +60,19 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; - import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -77,8 +84,7 @@ import static org.junit.Assert.fail; /** * Tests for {@link OneInputStreamTask}. * - * <p> - * Note:<br> + * <p>Note:<br> * We only use a {@link StreamMap} operator here. We also test the individual operators but Map is * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all * OneInputStreamOperators. @@ -238,7 +244,7 @@ public class OneInputStreamTaskTest extends TestLogger { * It also verifies that when task is idle, watermarks generated in the middle of chains are also blocked and * never forwarded. * - * The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator). + * <p>The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator). * The operators will throw an exception and fail the test if either of them were forwarded watermarks when * the task is idle. */ @@ -506,7 +512,6 @@ public class OneInputStreamTaskTest extends TestLogger { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // Then give the earlier barrier, these should be ignored testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); @@ -523,7 +528,7 @@ public class OneInputStreamTaskTest extends TestLogger { /** * Tests that the stream operator can snapshot and restore the operator state of chained - * operators + * operators. */ @Test public void testSnapshottingAndRestoring() throws Exception { @@ -561,7 +566,7 @@ public class OneInputStreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp); - while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())); + while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())) {} // since no state was set, there shouldn't be restore calls assertEquals(0, TestingStreamOperator.numberRestoreCalls); @@ -682,7 +687,6 @@ public class OneInputStreamTaskTest extends TestLogger { super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize); } - @Override public void acknowledgeCheckpoint( long checkpointId, @@ -793,8 +797,8 @@ public class OneInputStreamTaskTest extends TestLogger { ClassLoader cl = Thread.currentThread().getContextClassLoader(); - Serializable functionState= InstantiationUtil.deserializeObject(in, cl); - Integer operatorState= InstantiationUtil.deserializeObject(in, cl); + Serializable functionState = InstantiationUtil.deserializeObject(in, cl); + Integer operatorState = InstantiationUtil.deserializeObject(in, cl); assertEquals(random.nextInt(), functionState); assertEquals(random.nextInt(), (int) operatorState); @@ -885,7 +889,7 @@ public class OneInputStreamTaskTest extends TestLogger { * An operator that can be triggered whether or not to expect watermarks forwarded to it, toggled * by letting it process special trigger marker records. * - * If it receives a watermark when it's not expecting one, it'll throw an exception and fail. + * <p>If it receives a watermark when it's not expecting one, it'll throw an exception and fail. */ private static class TriggerableFailOnWatermarkTestOperator extends AbstractStreamOperator<String> @@ -893,8 +897,8 @@ public class OneInputStreamTaskTest extends TestLogger { private static final long serialVersionUID = 2048954179291813243L; - public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS"; - public final static String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS"; + public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS"; + public static final String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS"; protected boolean expectForwardedWatermarks; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 19e27f2..4d11c97 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,21 +31,18 @@ import java.io.IOException; /** * Test harness for testing a {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. * - * <p> - * This mock Invokable provides the task with a basic runtime context and allows pushing elements + * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements * and events. You are free to modify the retrieved list. * - * <p> - * After setting up everything the Task can be invoked using {@link #invoke()}. This will start + * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task * thread to finish. Use {@link #processElement} to send elements to the task. Use * {@link #processEvent(AbstractEvent)} to send events to the task. * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task * that data entry is finished. * - * <p> - * When Elements or Events are offered to the Task they are put into a queue. The input gates + * <p>When Elements or Events are offered to the Task they are put into a queue. The input gates * of the Task notifyNonEmpty from this queue. Use {@link #waitForInputProcessing()} to wait until all * queues are empty. This must be used after entering some elements before checking the * desired output. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java index e5caff3..47a5350 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java @@ -45,8 +45,8 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class SourceExternalCheckpointTriggerTest { - static final OneShotLatch ready = new OneShotLatch(); - static final MultiShotLatch sync = new MultiShotLatch(); + private static final OneShotLatch ready = new OneShotLatch(); + private static final MultiShotLatch sync = new MultiShotLatch(); @Test public void testCheckpointsTriggeredBySource() throws Exception { @@ -129,7 +129,7 @@ public class SourceExternalCheckpointTriggerTest { private final long numEvents; private final long checkpointFrequency; - + private CheckpointTrigger trigger; ExternalCheckpointsSource(long numEvents, long checkpointFrequency) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 9e26f9e..27818bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -78,22 +78,22 @@ public class SourceStreamTaskTest { * and element emission. This also verifies that there are no concurrent invocations * of the checkpoint method on the source operator. * - * The source emits elements and performs checkpoints. We have several checkpointer threads + * <p>The source emits elements and performs checkpoints. We have several checkpointer threads * that fire checkpoint requests at the source task. * - * If element emission and checkpointing are not in series the count of elements at the + * <p>If element emission and checkpointing are not in series the count of elements at the * beginning of a checkpoint and at the end of a checkpoint are not the same because the * source kept emitting elements while the checkpoint was ongoing. */ @Test @SuppressWarnings("unchecked") public void testCheckpointing() throws Exception { - final int NUM_ELEMENTS = 100; - final int NUM_CHECKPOINTS = 100; - final int NUM_CHECKPOINTERS = 1; - final int CHECKPOINT_INTERVAL = 5; // in ms - final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint - final int SOURCE_READ_DELAY = 1; // in ms + final int numElements = 100; + final int numCheckpoints = 100; + final int numCheckpointers = 1; + final int checkpointInterval = 5; // in ms + final int sourceCheckpointDelay = 1000; // how many random values we sum up in storeCheckpoint + final int sourceReadDelay = 1; // in ms ExecutorService executor = Executors.newFixedThreadPool(10); try { @@ -104,25 +104,25 @@ public class SourceStreamTaskTest { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); + StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(numElements, sourceCheckpointDelay, sourceReadDelay)); streamConfig.setStreamOperator(sourceOperator); // prepare the - Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS]; + Future<Boolean>[] checkpointerResults = new Future[numCheckpointers]; // invoke this first, so the tasks are actually running when the checkpoints are scheduled testHarness.invoke(); - for (int i = 0; i < NUM_CHECKPOINTERS; i++) { - checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask)); + for (int i = 0; i < numCheckpointers; i++) { + checkpointerResults[i] = executor.submit(new Checkpointer(numCheckpoints, checkpointInterval, sourceTask)); } testHarness.waitForTaskCompletion(); // Get the result from the checkpointers, if these threw an exception it // will be rethrown here - for (int i = 0; i < NUM_CHECKPOINTERS; i++) { + for (int i = 0; i < numCheckpointers; i++) { if (!checkpointerResults[i].isDone()) { checkpointerResults[i].cancel(true); } @@ -132,7 +132,7 @@ public class SourceStreamTaskTest { } List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); - Assert.assertEquals(NUM_ELEMENTS, resultElements.size()); + Assert.assertEquals(numElements, resultElements.size()); } finally { executor.shutdown(); @@ -241,7 +241,7 @@ public class SourceStreamTaskTest { } } - public static class OpenCloseTestSource extends RichSourceFunction<String> { + private static class OpenCloseTestSource extends RichSourceFunction<String> { private static final long serialVersionUID = 1L; public static boolean openCalled = false;
