http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 8db7a7f..42c6c6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -15,10 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -26,7 +25,6 @@ import 
org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -67,7 +65,11 @@ import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -83,6 +85,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link WindowOperator}.
+ */
 @SuppressWarnings("serial")
 public class WindowOperatorTest extends TestLogger {
 
@@ -108,13 +113,11 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
 
-
                testHarness.processWatermark(new Watermark(999));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
999));
                expectedOutput.add(new Watermark(999));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                testHarness.processWatermark(new Watermark(1999));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
1999));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
1999));
@@ -149,7 +152,6 @@ public class WindowOperatorTest extends TestLogger {
                expectedOutput.add(new Watermark(5999));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                // those don't have any effect...
                testHarness.processWatermark(new Watermark(6999));
                testHarness.processWatermark(new Watermark(7999));
@@ -164,8 +166,8 @@ public class WindowOperatorTest extends TestLogger {
        public void testSlidingEventTimeWindowsReduce() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 3;
-               final int WINDOW_SLIDE = 1;
+               final int windowSize = 3;
+               final int windowSlide = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -174,7 +176,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               SlidingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -200,8 +202,8 @@ public class WindowOperatorTest extends TestLogger {
        public void testSlidingEventTimeWindowsApply() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 3;
-               final int WINDOW_SLIDE = 1;
+               final int windowSize = 3;
+               final int windowSlide = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -209,7 +211,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               SlidingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -249,12 +251,10 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
 
-
                testHarness.processWatermark(new Watermark(999));
                expectedOutput.add(new Watermark(999));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                testHarness.processWatermark(new Watermark(1999));
                expectedOutput.add(new Watermark(1999));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
@@ -285,7 +285,6 @@ public class WindowOperatorTest extends TestLogger {
                expectedOutput.add(new Watermark(5999));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                // those don't have any effect...
                testHarness.processWatermark(new Watermark(6999));
                testHarness.processWatermark(new Watermark(7999));
@@ -300,7 +299,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testTumblingEventTimeWindowsReduce() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -309,7 +308,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -334,7 +333,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testTumblingEventTimeWindowsApply() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -342,7 +341,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -370,7 +369,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testSessionWindows() throws Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -378,7 +377,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -443,7 +442,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testSessionWindowsWithProcessFunction() throws Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -451,7 +450,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -516,7 +515,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testReduceSessionWindows() throws Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -524,7 +523,7 @@ public class WindowOperatorTest extends TestLogger {
                                "window-contents", new SumReducer(), 
inputType.createSerializer(new ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new 
WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -587,7 +586,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testReduceSessionWindowsWithProcessFunction() throws 
Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -595,7 +594,7 @@ public class WindowOperatorTest extends TestLogger {
                                "window-contents", new SumReducer(), 
inputType.createSerializer(new ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new 
WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -662,7 +661,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testSessionWindowsWithCountTrigger() throws Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -670,7 +669,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -709,7 +708,6 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 6500));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 7000));
 
-
                expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 
0L, 6500L), 6499));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
@@ -733,7 +731,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testSessionWindowsWithContinuousEventTimeTrigger() throws 
Exception {
                closeCalled.set(0);
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -741,7 +739,7 @@ public class WindowOperatorTest extends TestLogger {
                        inputType.createSerializer(new ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                       
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                       
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                        new TimeWindow.Serializer(),
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
@@ -864,7 +862,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testContinuousWatermarkTrigger() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -879,7 +877,7 @@ public class WindowOperatorTest extends TestLogger {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
-                               
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
ContinuousEventTimeTrigger.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                0,
                                null /* late data output tag */);
 
@@ -905,12 +903,10 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
 
-
                testHarness.processWatermark(new Watermark(1000));
                expectedOutput.add(new Watermark(1000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                testHarness.processWatermark(new Watermark(2000));
                expectedOutput.add(new Watermark(2000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
@@ -930,13 +926,12 @@ public class WindowOperatorTest extends TestLogger {
 
                testHarness.processWatermark(new Watermark(6000));
 
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
Long.MAX_VALUE));
 
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 
Long.MAX_VALUE));
                expectedOutput.add(new Watermark(6000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
-
                // those don't have any effect...
                testHarness.processWatermark(new Watermark(7000));
                testHarness.processWatermark(new Watermark(8000));
@@ -953,7 +948,7 @@ public class WindowOperatorTest extends TestLogger {
        public void testCountTrigger() throws Exception {
                closeCalled.set(0);
 
-               final int WINDOW_SIZE = 4;
+               final int windowSize = 4;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -968,7 +963,7 @@ public class WindowOperatorTest extends TestLogger {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
-                               PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+                               PurgingTrigger.of(CountTrigger.of(windowSize)),
                                0,
                                null /* late data output tag */);
 
@@ -1010,7 +1005,7 @@ public class WindowOperatorTest extends TestLogger {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
-                               PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+                               PurgingTrigger.of(CountTrigger.of(windowSize)),
                                0,
                                null /* late data output tag */);
 
@@ -1042,7 +1037,7 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testProcessingTimeTumblingWindows() throws Throwable {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1051,7 +1046,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -1100,8 +1095,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testProcessingTimeSlidingWindows() throws Throwable {
-               final int WINDOW_SIZE = 3;
-               final int WINDOW_SLIDE = 1;
+               final int windowSize = 3;
+               final int windowSlide = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1110,7 +1105,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
SlidingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), 
Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               
SlidingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS), 
Time.of(windowSlide, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -1173,7 +1168,7 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testProcessingTimeSessionWindows() throws Throwable {
-               final int WINDOW_GAP = 3;
+               final int windowGap = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1182,7 +1177,7 @@ public class WindowOperatorTest extends TestLogger {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
ProcessingTimeSessionWindows.withGap(Time.of(WINDOW_GAP, TimeUnit.SECONDS)),
+                               
ProcessingTimeSessionWindows.withGap(Time.of(windowGap, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -1201,10 +1196,10 @@ public class WindowOperatorTest extends TestLogger {
 
                // timestamp is ignored in processing time
                testHarness.setProcessingTime(3);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1));//Long.MAX_VALUE));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1)); //Long.MAX_VALUE));
 
                testHarness.setProcessingTime(1000);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1002)); //Long.MAX_VALUE));
 
                testHarness.setProcessingTime(5000);
 
@@ -1237,8 +1232,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testLateness() throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 500;
+               final int windowSize = 2;
+               final long lateness = 500;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1248,14 +1243,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                PurgingTrigger.of(EventTimeTrigger.create()),
-                               LATENESS,
+                               lateness,
                                lateOutputTag);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -1293,7 +1288,7 @@ public class WindowOperatorTest extends TestLogger {
                expected.add(new Watermark(7000));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
-               
+
                TestHarnessUtil.assertOutputEqualsSorted(
                                "SideOutput was not correct.",
                                lateExpected,
@@ -1305,8 +1300,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimeOverflow() throws Exception {
-               final int WINDOW_SIZE = 1000;
-               final long LATENESS = 2000;
+               final int windowSize = 1000;
+               final long lateness = 2000;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1314,7 +1309,7 @@ public class WindowOperatorTest extends TestLogger {
                        new SumReducer(),
                        inputType.createSerializer(new ExecutionConfig()));
 
-               TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
+               TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(windowSize));
 
                final WindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
@@ -1325,7 +1320,7 @@ public class WindowOperatorTest extends TestLogger {
                                        stateDesc,
                                        new 
InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, 
TimeWindow, Tuple2<String, Integer>>()),
                                        EventTimeTrigger.create(),
-                                       LATENESS,
+                                       lateness,
                                        null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -1347,10 +1342,10 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), timestamp));
 
                // the garbage collection timer would wrap-around
-               Assert.assertTrue(window.maxTimestamp() + LATENESS < 
window.maxTimestamp());
+               Assert.assertTrue(window.maxTimestamp() + lateness < 
window.maxTimestamp());
 
                // and it would prematurely fire with watermark (Long.MAX_VALUE 
- 1500)
-               Assert.assertTrue(window.maxTimestamp() + LATENESS < 
Long.MAX_VALUE - 1500);
+               Assert.assertTrue(window.maxTimestamp() + lateness < 
Long.MAX_VALUE - 1500);
 
                // if we don't correctly prevent wrap-around in the garbage 
collection
                // timers this watermark will clean our window state for the 
just-added
@@ -1374,8 +1369,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testSideOutputDueToLatenessTumbling() throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 0;
+               final int windowSize = 2;
+               final long lateness = 0;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1385,14 +1380,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                lateOutputTag);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -1438,9 +1433,9 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testSideOutputDueToLatenessSliding() throws Exception {
-               final int WINDOW_SIZE = 3;
-               final int WINDOW_SLIDE = 1;
-               final long LATENESS = 0;
+               final int windowSize = 3;
+               final int windowSlide = 1;
+               final long lateness = 0;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1450,14 +1445,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               SlidingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS), Time.of(windowSlide, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                lateOutputTag /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -1482,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger {
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 3001));
 
-               // lateness is set to 0 and window_size = 3 sec and slide 1, 
the following 2 elements (2400)
+               // lateness is set to 0 and window size = 3 sec and slide 1, 
the following 2 elements (2400)
                // are assigned to windows ending at 2999, 3999, 4999.
                // The 2999 is dropped because it is already late (WM = 2999) 
but the rest are kept.
 
@@ -1518,8 +1513,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void 
testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception 
{
-               final int GAP_SIZE = 3;
-               final long LATENESS = 0;
+               final int gapSize = 3;
+               final long lateness = 0;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1529,14 +1524,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                PurgingTrigger.of(EventTimeTrigger.create()),
-                               LATENESS,
+                               lateness,
                                lateOutputTag);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -1610,8 +1605,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testSideOutputDueToLatenessSessionZeroLateness() throws 
Exception {
-               final int GAP_SIZE = 3;
-               final long LATENESS = 0;
+               final int gapSize = 3;
+               final long lateness = 0;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1621,14 +1616,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                lateOutputTag);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -1700,8 +1695,8 @@ public class WindowOperatorTest extends TestLogger {
                // this has the same output as 
testSideOutputDueToLatenessSessionZeroLateness() because
                // the allowed lateness is too small to make a difference
 
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10;
+               final int gapSize = 3;
+               final long lateness = 10;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1711,14 +1706,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                PurgingTrigger.of(EventTimeTrigger.create()),
-                               LATENESS,
+                               lateness,
                                lateOutputTag);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -1787,8 +1782,8 @@ public class WindowOperatorTest extends TestLogger {
                // one that does not return FIRE_AND_PURGE when firing but just 
FIRE. The expected
                // results are therefore slightly different.
 
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10;
+               final int gapSize = 3;
+               final long lateness = 10;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1798,14 +1793,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -1888,8 +1883,8 @@ public class WindowOperatorTest extends TestLogger {
        @Test
        public void 
testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws 
Exception {
 
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10000;
+               final int gapSize = 3;
+               final long lateness = 10000;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1899,14 +1894,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                PurgingTrigger.of(EventTimeTrigger.create()),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -1979,8 +1974,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testNotSideOutputDueToLatenessSessionWithHugeLateness() 
throws Exception {
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10000;
+               final int gapSize = 3;
+               final long lateness = 10000;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -1990,14 +1985,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -2072,8 +2067,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimerWithEmptyListStateForTumblingWindows2() 
throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 100;
+               final int windowSize = 2;
+               final long lateness = 100;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2082,14 +2077,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction2()),
-                                       new EventTimeTriggerAccumGC(LATENESS),
-                               LATENESS,
+                                       new EventTimeTriggerAccumGC(lateness),
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
@@ -2127,8 +2122,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimerWithEmptyListStateForTumblingWindows() 
throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 1;
+               final int windowSize = 2;
+               final long lateness = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2137,14 +2132,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -2173,8 +2168,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() 
throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 1;
+               final int windowSize = 2;
+               final long lateness = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2184,14 +2179,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -2220,8 +2215,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() 
throws Exception {
-               final int WINDOW_SIZE = 2;
-               final long LATENESS = 1;
+               final int windowSize = 2;
+               final long lateness = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2242,14 +2237,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -2278,8 +2273,8 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        public void testCleanupTimerWithEmptyListStateForSessionWindows() 
throws Exception {
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10;
+               final int gapSize = 3;
+               final long lateness = 10;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2288,14 +2283,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -2322,8 +2317,8 @@ public class WindowOperatorTest extends TestLogger {
        @Test
        public void testCleanupTimerWithEmptyReduceStateForSessionWindows() 
throws Exception {
 
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10;
+               final int gapSize = 3;
+               final long lateness = 10;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2333,14 +2328,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
@@ -2367,8 +2362,8 @@ public class WindowOperatorTest extends TestLogger {
        // TODO this test seems invalid, as it uses the unsupported combination 
of merging windows and folding window state
        @Test
        public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() 
throws Exception {
-               final int GAP_SIZE = 3;
-               final long LATENESS = 10;
+               final int gapSize = 3;
+               final long lateness = 10;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -2389,14 +2384,14 @@ public class WindowOperatorTest extends TestLogger {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughFunction()),
                                EventTimeTrigger.create(),
-                               LATENESS,
+                               lateness,
                                null /* late data output tag */);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
@@ -2435,7 +2430,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+       private static class SumReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
@@ -2444,8 +2439,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
+       private static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
@@ -2539,7 +2533,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
+       private static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -2556,7 +2550,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class ReducedSessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
+       private static class ReducedSessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -2570,7 +2564,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class SessionProcessWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
+       private static class SessionProcessWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -2588,7 +2582,7 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class ReducedProcessSessionWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
+       private static class ReducedProcessSessionWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -2603,10 +2597,9 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
-       public static class PointSessionWindows extends EventTimeSessionWindows 
{
+       private static class PointSessionWindows extends 
EventTimeSessionWindows {
                private static final long serialVersionUID = 1L;
 
-
                private PointSessionWindows(long sessionTimeout) {
                        super(sessionTimeout);
                }
@@ -2629,7 +2622,7 @@ public class WindowOperatorTest extends TestLogger {
         * purge the state of the fired window. This is to test the state
         * garbage collection mechanism.
         */
-       public static class EventTimeTriggerAccumGC extends Trigger<Object, 
TimeWindow> {
+       private static class EventTimeTriggerAccumGC extends Trigger<Object, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                private long cleanupTime;
@@ -2672,8 +2665,7 @@ public class WindowOperatorTest extends TestLogger {
                }
 
                @Override
-               public void onMerge(TimeWindow window,
-                                                                        
OnMergeContext ctx) {
+               public void onMerge(TimeWindow window, OnMergeContext ctx) {
                        ctx.registerEventTimeTimer(window.maxTimestamp());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 5071c37..ced27b6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -124,7 +125,7 @@ public class WindowTranslationTest {
                source
                                .keyBy(0)
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-                               .aggregate(new 
DummyRichAggregationFunction<Tuple2<String,Integer>>());
+                               .aggregate(new 
DummyRichAggregationFunction<Tuple2<String, Integer>>());
 
                fail("exception was not thrown");
        }
@@ -998,14 +999,14 @@ public class WindowTranslationTest {
                                                        Iterable<Tuple3<String, 
String, Integer>> values,
                                                        
Collector<Tuple3<String, String, Integer>> out) throws Exception {
                                                for (Tuple3<String, String, 
Integer> in : values) {
-                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f1, in.f2));
                                                }
                                        }
                                });
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
                                (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
                WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
@@ -1036,14 +1037,14 @@ public class WindowTranslationTest {
                                                        Iterable<Tuple3<String, 
String, Integer>> values,
                                                        
Collector<Tuple3<String, String, Integer>> out) throws Exception {
                                                for (Tuple3<String, String, 
Integer> in : values) {
-                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f1, in.f2));
                                                }
                                        }
                                });
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
                                (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
                WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
@@ -1473,7 +1474,6 @@ public class WindowTranslationTest {
                                winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
        }
 
-
        @Test
        @SuppressWarnings({"rawtypes", "unchecked"})
        public void testFoldWithEvictor() throws Exception {
@@ -1665,7 +1665,7 @@ public class WindowTranslationTest {
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
+       private static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
 
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1, Tuple2<String, Integer> value2) throws Exception {
@@ -1758,7 +1758,6 @@ public class WindowTranslationTest {
                }
        }
 
-
        private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
index 449d54b..8be5456 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -24,24 +25,24 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  * a {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction}.
  */
 public class WindowedValue<T, W extends Window> {
-  private final T value;
-  private final W window;
+       private final T value;
+       private final W window;
 
-  public WindowedValue(T value, W window) {
-    this.value = value;
-    this.window = window;
-  }
+       public WindowedValue(T value, W window) {
+               this.value = value;
+               this.window = window;
+       }
 
-  public T value() {
-    return value;
-  }
+       public T value() {
+               return value;
+       }
 
-  public W window() {
-    return window;
-  }
+       public W window() {
+               return window;
+       }
 
-  @Override
-  public String toString() {
-    return "WindowedValue(" + value + ", " + window + ")";
-  }
+       @Override
+       public String toString() {
+               return "WindowedValue(" + value + ", " + window + ")";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index ddfb9e7..3077005 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.assertArrayEquals;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link BroadcastPartitioner}.
+ */
 public class BroadcastPartitionerTest {
 
        private BroadcastPartitioner<Tuple> broadcastPartitioner1;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index f7bd739..2ecf17b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ForwardPartitioner}.
+ */
 public class ForwardPartitionerTest {
 
        private ForwardPartitioner<Tuple> forwardPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
index 6ae3730..5d023c8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.assertArrayEquals;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link GlobalPartitioner}.
+ */
 public class GlobalPartitionerTest {
 
        private GlobalPartitioner<Tuple> globalPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
index 4ca7449..a57e6f4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
@@ -17,17 +17,21 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KeyGroupStreamPartitioner}.
+ */
 public class KeyGroupStreamPartitionerTest extends TestLogger {
 
        private KeyGroupStreamPartitioner<Tuple2<String, Integer>, String> 
keyGroupPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
index 06a1acd..85410f3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RebalancePartitioner}.
+ */
 public class RebalancePartitionerTest {
 
        private RebalancePartitioner<Tuple> distributePartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 4019d63..309f24d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -56,8 +56,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link RescalePartitioner}.
+ */
 @SuppressWarnings("serial")
 public class RescalePartitionerTest extends TestLogger {
 
@@ -161,10 +166,9 @@ public class RescalePartitionerTest extends TestLogger {
                        fail("Building ExecutionGraph failed: " + 
e.getMessage());
                }
 
-
                ExecutionJobVertex execSourceVertex = 
eg.getJobVertex(sourceVertex.getID());
-               ExecutionJobVertex execMapVertex= 
eg.getJobVertex(mapVertex.getID());
-               ExecutionJobVertex execSinkVertex= 
eg.getJobVertex(sinkVertex.getID());
+               ExecutionJobVertex execMapVertex = 
eg.getJobVertex(mapVertex.getID());
+               ExecutionJobVertex execSinkVertex = 
eg.getJobVertex(sinkVertex.getID());
 
                assertEquals(0, execSourceVertex.getInputs().size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
index aff177c..238ec4b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -17,15 +17,19 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ShufflePartitioner}.
+ */
 public class ShufflePartitionerTest {
 
        private ShufflePartitioner<Tuple> shufflePartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index 2012c94..79b2b75 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -32,10 +32,12 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link StreamElementSerializer}.
+ */
 public class StreamElementSerializerTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
index 08d9644..a869e70 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
@@ -20,8 +20,15 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamRecord}.
+ */
 public class StreamRecordTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
index 564901f..4c945fe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.streamstatus;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
+
 import org.junit.Test;
 
 import java.util.concurrent.BlockingQueue;
@@ -34,8 +35,7 @@ import static org.junit.Assert.assertTrue;
  * the watermarks and stream statuses to forward are generated from the valve 
at the exact correct times and in a
  * deterministic behaviour. The unit tests here also test more complex stream 
status / watermark input cases.
  *
- * <p>
- * The tests are performed by a series of watermark and stream status inputs 
to the valve. On every input method call,
+ * <p>The tests are performed by a series of watermark and stream status 
inputs to the valve. On every input method call,
  * the output is checked to contain only the expected watermark or stream 
status, and nothing else. This ensures that
  * no redundant outputs are generated by the output logic of {@link 
StatusWatermarkValve}. The behaviours that a series of
  * input calls to the valve is trying to test is explained as inline comments 
within the tests.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
index 247dc8b..152fc6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
@@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.streamstatus;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link StreamStatus}.
+ */
 public class StreamStatusTest {
 
        @Test (expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index bf5be79..51328ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -67,6 +67,7 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamFilter;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -80,8 +81,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * This test checks that task checkpoints that block and do not react to 
thread interrupts
- * are
+ * This test checks that task checkpoints that block and do not react to 
thread interrupts.
  */
 public class BlockingCheckpointsTest {
 
@@ -281,10 +281,9 @@ public class BlockingCheckpointsTest {
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  stream task that simply triggers a checkpoint
-       // 
------------------------------------------------------------------------
-
+       /**
+        * Stream task that simply triggers a checkpoint.
+        */
        public static final class TestStreamTask extends 
OneInputStreamTask<Object, Object> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 4435247..542e88a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -68,6 +68,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.io.EOFException;
@@ -92,7 +93,7 @@ import static org.mockito.Mockito.when;
  * This test checks that task restores that get stuck in the presence of 
interrupts
  * are handled properly.
  *
- * In practice, reading from HDFS is interrupt sensitive: The HDFS code 
frequently deadlocks
+ * <p>In practice, reading from HDFS is interrupt sensitive: The HDFS code 
frequently deadlocks
  * or livelocks if it is interrupted.
  */
 public class InterruptSensitiveRestoreTest {
@@ -185,7 +186,6 @@ public class InterruptSensitiveRestoreTest {
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
-
                ChainedStateHandle<StreamStateHandle> operatorState = null;
                List<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
                List<KeyedStateHandle> keyedStateFromStream = 
Collections.emptyList();
@@ -197,7 +197,7 @@ public class InterruptSensitiveRestoreTest {
                                new OperatorStateHandle.StateMetaInfo(new 
long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
                
operatorStateMetadata.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
 metaInfo);
 
-               KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(new KeyGroupRange(0,0));
+               KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(new KeyGroupRange(0, 0));
 
                Collection<OperatorStateHandle> operatorStateHandles =
                                Collections.singletonList(new 
OperatorStateHandle(operatorStateMetadata, state));
@@ -381,7 +381,6 @@ public class InterruptSensitiveRestoreTest {
                @Override
                public void cancel() {}
 
-
                @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        fail("should never be called");
@@ -389,7 +388,7 @@ public class InterruptSensitiveRestoreTest {
 
                @Override
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
-                       
((StateInitializationContext)context).getRawOperatorStateInputs().iterator().next().getStream().read();
+                       ((StateInitializationContext) 
context).getRawOperatorStateInputs().iterator().next().getStream().read();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 90f5619..d343eaf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -47,8 +47,8 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -60,12 +60,19 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
-
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -77,8 +84,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link OneInputStreamTask}.
  *
- * <p>
- * Note:<br>
+ * <p>Note:<br>
  * We only use a {@link StreamMap} operator here. We also test the individual 
operators but Map is
  * used as a representative to test OneInputStreamTask, since 
OneInputStreamTask is used for all
  * OneInputStreamOperators.
@@ -238,7 +244,7 @@ public class OneInputStreamTaskTest extends TestLogger {
         * It also verifies that when task is idle, watermarks generated in the 
middle of chains are also blocked and
         * never forwarded.
         *
-        * The tested chain will be: (HEAD: normal operator) --> (watermark 
generating operator) --> (normal operator).
+        * <p>The tested chain will be: (HEAD: normal operator) --> (watermark 
generating operator) --> (normal operator).
         * The operators will throw an exception and fail the test if either of 
them were forwarded watermarks when
         * the task is idle.
         */
@@ -506,7 +512,6 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
-
                // Then give the earlier barrier, these should be ignored
                testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
                testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
@@ -523,7 +528,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
        /**
         * Tests that the stream operator can snapshot and restore the operator 
state of chained
-        * operators
+        * operators.
         */
        @Test
        public void testSnapshottingAndRestoring() throws Exception {
@@ -561,7 +566,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-               while(!streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint()));
+               while (!streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forFullCheckpoint())) {}
 
                // since no state was set, there shouldn't be restore calls
                assertEquals(0, TestingStreamOperator.numberRestoreCalls);
@@ -682,7 +687,6 @@ public class OneInputStreamTaskTest extends TestLogger {
                        super(jobConfig, taskConfig, executionConfig, 
memorySize, inputSplitProvider, bufferSize);
                }
 
-
                @Override
                public void acknowledgeCheckpoint(
                                long checkpointId,
@@ -793,8 +797,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                        ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
 
-                       Serializable functionState= 
InstantiationUtil.deserializeObject(in, cl);
-                       Integer operatorState= 
InstantiationUtil.deserializeObject(in, cl);
+                       Serializable functionState = 
InstantiationUtil.deserializeObject(in, cl);
+                       Integer operatorState = 
InstantiationUtil.deserializeObject(in, cl);
 
                        assertEquals(random.nextInt(), functionState);
                        assertEquals(random.nextInt(), (int) operatorState);
@@ -885,7 +889,7 @@ public class OneInputStreamTaskTest extends TestLogger {
         * An operator that can be triggered whether or not to expect 
watermarks forwarded to it, toggled
         * by letting it process special trigger marker records.
         *
-        * If it receives a watermark when it's not expecting one, it'll throw 
an exception and fail.
+        * <p>If it receives a watermark when it's not expecting one, it'll 
throw an exception and fail.
         */
        private static class TriggerableFailOnWatermarkTestOperator
                        extends AbstractStreamOperator<String>
@@ -893,8 +897,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                private static final long serialVersionUID = 
2048954179291813243L;
 
-               public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = 
"EXPECT_WATERMARKS";
-               public final static String NO_FORWARDED_WATERMARKS_MARKER = 
"NO_WATERMARKS";
+               public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = 
"EXPECT_WATERMARKS";
+               public static final String NO_FORWARDED_WATERMARKS_MARKER = 
"NO_WATERMARKS";
 
                protected boolean expectForwardedWatermarks;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 19e27f2..4d11c97 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,21 +31,18 @@ import java.io.IOException;
 /**
  * Test harness for testing a {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
+ * <p>This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the 
emitted elements
  * and events. You are free to modify the retrieved list.
  *
- * <p>
- * After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
+ * <p>After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to 
wait for the Task
  * thread to finish. Use {@link #processElement} to send elements to the task. 
Use
  * {@link #processEvent(AbstractEvent)} to send events to the task.
  * Before waiting for the task to finish you must call {@link #endInput()} to 
signal to the task
  * that data entry is finished.
  *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. 
The input gates
+ * <p>When Elements or Events are offered to the Task they are put into a 
queue. The input gates
  * of the Task notifyNonEmpty from this queue. Use {@link 
#waitForInputProcessing()} to wait until all
  * queues are empty. This must be used after entering some elements before 
checking the
  * desired output.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
index e5caff3..47a5350 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -45,8 +45,8 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class SourceExternalCheckpointTriggerTest {
 
-       static final OneShotLatch ready = new OneShotLatch();
-       static final MultiShotLatch sync = new MultiShotLatch();
+       private static final OneShotLatch ready = new OneShotLatch();
+       private static final MultiShotLatch sync = new MultiShotLatch();
 
        @Test
        public void testCheckpointsTriggeredBySource() throws Exception {
@@ -129,7 +129,7 @@ public class SourceExternalCheckpointTriggerTest {
 
                private final long numEvents;
                private final long checkpointFrequency;
-               
+
                private CheckpointTrigger trigger;
 
                ExternalCheckpointsSource(long numEvents, long 
checkpointFrequency) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9e26f9e..27818bc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -78,22 +78,22 @@ public class SourceStreamTaskTest {
         * and element emission. This also verifies that there are no 
concurrent invocations
         * of the checkpoint method on the source operator.
         *
-        * The source emits elements and performs checkpoints. We have several 
checkpointer threads
+        * <p>The source emits elements and performs checkpoints. We have 
several checkpointer threads
         * that fire checkpoint requests at the source task.
         *
-        * If element emission and checkpointing are not in series the count of 
elements at the
+        * <p>If element emission and checkpointing are not in series the count 
of elements at the
         * beginning of a checkpoint and at the end of a checkpoint are not the 
same because the
         * source kept emitting elements while the checkpoint was ongoing.
         */
        @Test
        @SuppressWarnings("unchecked")
        public void testCheckpointing() throws Exception {
-               final int NUM_ELEMENTS = 100;
-               final int NUM_CHECKPOINTS = 100;
-               final int NUM_CHECKPOINTERS = 1;
-               final int CHECKPOINT_INTERVAL = 5; // in ms
-               final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random 
values we sum up in storeCheckpoint
-               final int SOURCE_READ_DELAY = 1; // in ms
+               final int numElements = 100;
+               final int numCheckpoints = 100;
+               final int numCheckpointers = 1;
+               final int checkpointInterval = 5; // in ms
+               final int sourceCheckpointDelay = 1000; // how many random 
values we sum up in storeCheckpoint
+               final int sourceReadDelay = 1; // in ms
 
                ExecutorService executor = Executors.newFixedThreadPool(10);
                try {
@@ -104,25 +104,25 @@ public class SourceStreamTaskTest {
                        testHarness.setupOutputForSingletonOperatorChain();
 
                        StreamConfig streamConfig = 
testHarness.getStreamConfig();
-                       StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = 
new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, 
SOURCE_READ_DELAY));
+                       StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = 
new StreamSource<>(new MockSource(numElements, sourceCheckpointDelay, 
sourceReadDelay));
                        streamConfig.setStreamOperator(sourceOperator);
 
                        // prepare the
 
-                       Future<Boolean>[] checkpointerResults = new 
Future[NUM_CHECKPOINTERS];
+                       Future<Boolean>[] checkpointerResults = new 
Future[numCheckpointers];
 
                        // invoke this first, so the tasks are actually running 
when the checkpoints are scheduled
                        testHarness.invoke();
 
-                       for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-                               checkpointerResults[i] = executor.submit(new 
Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
+                       for (int i = 0; i < numCheckpointers; i++) {
+                               checkpointerResults[i] = executor.submit(new 
Checkpointer(numCheckpoints, checkpointInterval, sourceTask));
                        }
 
                        testHarness.waitForTaskCompletion();
 
                        // Get the result from the checkpointers, if these 
threw an exception it
                        // will be rethrown here
-                       for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+                       for (int i = 0; i < numCheckpointers; i++) {
                                if (!checkpointerResults[i].isDone()) {
                                        checkpointerResults[i].cancel(true);
                                }
@@ -132,7 +132,7 @@ public class SourceStreamTaskTest {
                        }
 
                        List<Tuple2<Long, Integer>> resultElements = 
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-                       Assert.assertEquals(NUM_ELEMENTS, 
resultElements.size());
+                       Assert.assertEquals(numElements, resultElements.size());
                }
                finally {
                        executor.shutdown();
@@ -241,7 +241,7 @@ public class SourceStreamTaskTest {
                }
        }
 
-       public static class OpenCloseTestSource extends 
RichSourceFunction<String> {
+       private static class OpenCloseTestSource extends 
RichSourceFunction<String> {
                private static final long serialVersionUID = 1L;
 
                public static boolean openCalled = false;

Reply via email to