http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 2a6a723..d9fcc12 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -37,11 +37,11 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -52,12 +52,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link StreamSource} operators.
+ */
 @SuppressWarnings("serial")
 public class StreamSourceOperatorTest {
 
@@ -86,7 +89,6 @@ public class StreamSourceOperatorTest {
                final StreamSource<String, InfiniteSource<String>> operator =
                                new StreamSource<>(new 
InfiniteSource<String>());
 
-
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                operator.cancel();
 
@@ -106,7 +108,6 @@ public class StreamSourceOperatorTest {
                final StreamSource<String, InfiniteSource<String>> operator =
                                new StreamSource<>(new 
InfiniteSource<String>());
 
-
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
 
                // trigger an async cancel in a bit
@@ -139,7 +140,6 @@ public class StreamSourceOperatorTest {
                final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
-
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                operator.stop();
 
@@ -158,7 +158,6 @@ public class StreamSourceOperatorTest {
                final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
-
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
 
                // trigger an async cancel in a bit
@@ -179,7 +178,7 @@ public class StreamSourceOperatorTest {
        }
 
        /**
-        * Test that latency marks are emitted
+        * Test that latency marks are emitted.
         */
        @Test
        public void testLatencyMarkEmission() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 6772db4..6e3be03 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -24,15 +24,16 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the timer service of {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}.
@@ -65,7 +66,6 @@ public class StreamTaskTimerTest {
 
                assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
 
-
                testHarness.endInput();
                testHarness.waitForTaskCompletion();
 
@@ -109,8 +109,7 @@ public class StreamTaskTimerTest {
                        long deadline = System.currentTimeMillis() + 20000;
                        while (errorRef.get() == null &&
                                        
ValidatingProcessingTimeCallback.numInSequence < 4 &&
-                                       System.currentTimeMillis() < deadline)
-                       {
+                                       System.currentTimeMillis() < deadline) {
                                Thread.sleep(100);
                        }
 
@@ -170,6 +169,9 @@ public class StreamTaskTimerTest {
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * Identity mapper.
+        */
        public static class DummyMapFunction<T> implements MapFunction<T, T> {
                @Override
                public T map(T value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 9897884..675ffa3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.junit.Test;
 
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link TestProcessingTimeService}.
+ */
 public class TestProcessingTimeServiceTest {
 
        @Test
@@ -90,6 +93,9 @@ public class TestProcessingTimeServiceTest {
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * An {@link AsyncExceptionHandler} storing the handled exception.
+        */
        public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
 
                private final AtomicReference<Throwable> errorReference;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index f129c20..51af116 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -28,8 +28,13 @@ import org.junit.Test;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link TimestampsAndPeriodicWatermarksOperator}.
+ */
 public class TimestampsAndPeriodicWatermarksOperatorTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
index 0333e93..a422432 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
@@ -30,6 +30,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link TimestampsAndPunctuatedWatermarksOperator}.
+ */
 public class TimestampsAndPunctuatedWatermarksOperatorTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 46d92af..d3fd585 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -27,6 +27,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+/**
+ * Test base for {@link GenericWriteAheadSink}.
+ */
 public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink<IN>> extends TestLogger {
 
        protected abstract S createSink() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 2f7e302..a57dcf1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -68,6 +68,9 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link AccumulatingProcessingTimeWindowOperator}.
+ */
 @SuppressWarnings({"serial"})
 @PrepareForTest(InternalIterableWindowFunction.class)
 @RunWith(PowerMockRunner.class)
@@ -220,7 +223,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        testHarness.close();
 
-
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
 
@@ -278,7 +280,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.setProcessingTime(currentTime);
                        }
 
-
                        List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(numElements, result.size());
 
@@ -322,7 +323,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.setProcessingTime(currentTime);
                        }
 
-
                        List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(numElements, result.size());
 
@@ -476,7 +476,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                        testHarness.setProcessingTime(200);
 
-
                        List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(6, result.size());
 
@@ -524,7 +523,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                        testHarness.setProcessingTime(200);
 
-
                        List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(6, result.size());
 
@@ -837,7 +835,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        testHarness.restore(state);
                        testHarness.open();
 
-
                        // inject again the remaining elements
                        for (int i = numElementsFirst; i < numElements; i++) {
                                testHarness.processElement(new 
StreamRecord<>(i));
@@ -929,7 +926,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        testHarness.restore(state);
                        testHarness.open();
 
-
                        // inject again the remaining elements
                        for (int i = numElementsFirst; i < numElements; i++) {
                                testHarness.processElement(new 
StreamRecord<>(i));
@@ -1040,7 +1036,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                // we use a concurrent map here even though there is no 
concurrency, to
                // get "volatile" style access to entries
-               static final Map<Integer, Integer> globalCounts = new 
ConcurrentHashMap<>();
+               private static final Map<Integer, Integer> globalCounts = new 
ConcurrentHashMap<>();
 
                private ValueState<Integer> state;
 
@@ -1053,9 +1049,9 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                @Override
                public void process(Integer key,
-                                                 Context context,
-                                                 Iterable<Integer> values,
-                                                 Collector<Integer> out) 
throws Exception {
+                                               Context context,
+                                               Iterable<Integer> values,
+                                               Collector<Integer> out) throws 
Exception {
                        for (Integer i : values) {
                                // we need to update this state before emitting 
elements. Else, the test's main
                                // thread will have received all output 
elements before the state is updated and
@@ -1093,8 +1089,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        private static StreamTask<?, ?> createMockTaskWithTimer(
-               final ProcessingTimeService timerService)
-       {
+               final ProcessingTimeService timerService) {
                StreamTask<?, ?> mockTask = createMockTask();
                
when(mockTask.getProcessingTimeService()).thenReturn(timerService);
                return mockTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 1875bbb..62f4f0b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -56,6 +56,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for aligned {@link AggregatingProcessingTimeWindowOperator}.
+ */
 @SuppressWarnings("serial")
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
@@ -66,9 +69,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
 
        private final KeySelector<Tuple2<Integer, Integer>, Integer> 
fieldOneSelector =
-                       new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+                       new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                                @Override
-                               public Integer getKey(Tuple2<Integer,Integer> 
value) {
+                               public Integer getKey(Tuple2<Integer, Integer> 
value) {
                                        return value.f0;
                                }
        };
@@ -585,7 +588,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, 
windowSlide);
 
-
                        OneInputStreamOperatorTestHarness<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(op);
 
@@ -824,7 +826,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        private static class StatefulFunction extends 
RichReduceFunction<Tuple2<Integer, Integer>> {
 
-               static final Map<Integer, Integer> globalCounts = new 
ConcurrentHashMap<>();
+               private static final Map<Integer, Integer> globalCounts = new 
ConcurrentHashMap<>();
 
                private ValueState<Integer> state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 81a9275..a7c6f47 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -1244,7 +1245,6 @@ public class AllWindowTranslationTest {
 
        }
 
-
        @Test
        @SuppressWarnings("rawtypes")
        public void testReduceWithCustomTrigger() throws Exception {
@@ -1461,7 +1461,7 @@ public class AllWindowTranslationTest {
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
+       private static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
index 0f65a88..9c14a9f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -62,7 +64,7 @@ public class ContinuousEventTimeTriggerTest {
 
 
        /**
-        * Verify that state <TimeWindow>of separate windows does not leak into 
other windows.
+        * Verify that state &lt;TimeWindow&gt;of separate windows does not 
leak into other windows.
         */
        @Test
        public void testWindowSeparationAndFiring() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
index 16e353b..38dd01d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
index a46572b..23af838 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -15,10 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -28,6 +27,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -38,27 +39,35 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
- * Tests for {@link EventTimeSessionWindows}
+ * Tests for {@link EventTimeSessionWindows}.
  */
 public class EventTimeSessionWindowsTest extends TestLogger {
 
        @Test
        public void testWindowAssignment() {
-               final int SESSION_GAP = 5000;
+               final int sessionGap = 5000;
 
                WindowAssigner.WindowAssignerContext mockContext =
                                
mock(WindowAssigner.WindowAssignerContext.class);
 
-               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP));
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(sessionGap));
 
-               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + SESSION_GAP)));
-               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
-               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + sessionGap)));
+               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
        }
 
        @Test
@@ -138,16 +147,16 @@ public class EventTimeSessionWindowsTest extends 
TestLogger {
        public void testTimeUnits() {
                // sanity check with one other time unit
 
-               final int SESSION_GAP = 5000;
+               final int sessionGap = 5000;
 
                WindowAssigner.WindowAssignerContext mockContext =
                                
mock(WindowAssigner.WindowAssignerContext.class);
 
-               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000));
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.seconds(sessionGap / 1000));
 
-               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + SESSION_GAP)));
-               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
-               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + sessionGap)));
+               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + sessionGap)));
+               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + sessionGap)));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
index 2d93ac0..2bcc192 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
index 7af4506..a89aec0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index e5d3ef0..8d65bb4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,18 +60,21 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Tests for {@link EvictingWindowOperator}.
+ */
 public class EvictingWindowOperatorTest {
 
        /**
-        * Tests CountEvictor evictAfter behavior
+        * Tests CountEvictor evictAfter behavior.
         * @throws Exception
      */
        @Test
        public void testCountEvictorEvictAfter() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int WINDOW_SIZE = 4;
-               final int TRIGGER_COUNT = 2;
-               final boolean EVICT_AFTER = true;
+               final int windowSize = 4;
+               final int triggerCount = 2;
+               final boolean evictAfter = true;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -80,7 +85,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -88,16 +92,14 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
-                       CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+                       CountTrigger.of(triggerCount),
+                       CountEvictor.of(windowSize, evictAfter),
                        0,
                        null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-
                long initialTime = 0L;
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -114,8 +116,6 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
@@ -139,18 +139,17 @@ public class EvictingWindowOperatorTest {
                testHarness.close();
 
                Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-
        }
 
        /**
-        * Tests TimeEvictor evictAfter behavior
+        * Tests TimeEvictor evictAfter behavior.
         * @throws Exception
         */
        @Test
        public void testTimeEvictorEvictAfter() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int TRIGGER_COUNT = 2;
-               final boolean EVICT_AFTER = true;
+               final int triggerCount = 2;
+               final boolean evictAfter = true;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -161,7 +160,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -169,12 +167,11 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
-                       TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+                       CountTrigger.of(triggerCount),
+                       TimeEvictor.of(Time.seconds(2), evictAfter),
                        0,
                        null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -194,13 +191,10 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 2001));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1001));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
Long.MAX_VALUE));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
@@ -218,14 +212,14 @@ public class EvictingWindowOperatorTest {
        }
 
        /**
-        * Tests TimeEvictor evictBefore behavior
+        * Tests TimeEvictor evictBefore behavior.
         * @throws Exception
         */
        @Test
        public void testTimeEvictorEvictBefore() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int TRIGGER_COUNT = 2;
-               final int WINDOW_SIZE = 4;
+               final int triggerCount = 2;
+               final int windowSize = 4;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -236,20 +230,18 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
-                       TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                       TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                        new TimeWindow.Serializer(),
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
+                       CountTrigger.of(triggerCount),
                        TimeEvictor.of(Time.seconds(2)),
                        0,
                        null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -270,13 +262,10 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 2001));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1001));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 
3999));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
3999));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
3999));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 6500));
@@ -290,19 +279,18 @@ public class EvictingWindowOperatorTest {
                testHarness.close();
 
                Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-
        }
 
        /**
-        * Tests time evictor, if no timestamp information in the StreamRecord
-        * No element will be evicted from the window
+        * Tests time evictor, if no timestamp information in the StreamRecord.
+        * No element will be evicted from the window.
         * @throws Exception
         */
        @Test
        public void testTimeEvictorNoTimestamp() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int TRIGGER_COUNT = 2;
-               final boolean EVICT_AFTER = true;
+               final int triggerCount = 2;
+               final boolean evictAfter = true;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -313,7 +301,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -321,12 +308,11 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
-                       TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
+                       CountTrigger.of(triggerCount),
+                       TimeEvictor.of(Time.seconds(2), evictAfter),
                        0,
                        null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -345,13 +331,10 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
@@ -365,19 +348,18 @@ public class EvictingWindowOperatorTest {
                testHarness.close();
 
                Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-
        }
 
        /**
-        * Tests DeltaEvictor, evictBefore behavior
+        * Tests DeltaEvictor, evictBefore behavior.
         * @throws Exception
         */
        @Test
        public void testDeltaEvictorEvictBefore() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int TRIGGER_COUNT = 2;
-               final boolean EVICT_AFTER = false;
-               final int THRESHOLD = 2;
+               final int triggerCount = 2;
+               final boolean evictAfter = false;
+               final int threshold = 2;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -388,7 +370,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -396,18 +377,16 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
-                       DeltaEvictor.of(THRESHOLD, new 
DeltaFunction<Tuple2<String, Integer>>() {
+                       CountTrigger.of(triggerCount),
+                       DeltaEvictor.of(threshold, new 
DeltaFunction<Tuple2<String, Integer>>() {
                                @Override
                                public double getDelta(Tuple2<String, Integer> 
oldDataPoint, Tuple2<String, Integer> newDataPoint) {
                                        return newDataPoint.f1 - 
oldDataPoint.f1;
                                }
-                       }, EVICT_AFTER),
+                       }, evictAfter),
                        0,
                        null /* late data output tag */);
 
-
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -447,15 +426,15 @@ public class EvictingWindowOperatorTest {
        }
 
        /**
-        * Tests DeltaEvictor, evictAfter behavior
+        * Tests DeltaEvictor, evictAfter behavior.
         * @throws Exception
         */
        @Test
        public void testDeltaEvictorEvictAfter() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
-               final int TRIGGER_COUNT = 2;
-               final boolean EVICT_AFTER = true;
-               final int THRESHOLD = 2;
+               final int triggerCount = 2;
+               final boolean evictAfter = true;
+               final int threshold = 2;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -466,7 +445,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                        new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -474,18 +452,16 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(TRIGGER_COUNT),
-                       DeltaEvictor.of(THRESHOLD, new 
DeltaFunction<Tuple2<String, Integer>>() {
+                       CountTrigger.of(triggerCount),
+                       DeltaEvictor.of(threshold, new 
DeltaFunction<Tuple2<String, Integer>>() {
                                @Override
                                public double getDelta(Tuple2<String, Integer> 
oldDataPoint, Tuple2<String, Integer> newDataPoint) {
                                        return newDataPoint.f1 - 
oldDataPoint.f1;
                                }
-                       }, EVICT_AFTER),
+                       }, evictAfter),
                        0,
                        null /* late data output tag */);
 
-
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -528,8 +504,8 @@ public class EvictingWindowOperatorTest {
        @SuppressWarnings("unchecked")
        public void testCountTrigger() throws Exception {
 
-               final int WINDOW_SIZE = 4;
-               final int WINDOW_SLIDE = 2;
+               final int windowSize = 4;
+               final int windowSlide = 2;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -540,7 +516,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                                new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
@@ -552,12 +527,11 @@ public class EvictingWindowOperatorTest {
                                                                new 
SumReducer(),
                                                                // on some 
versions of Java we seem to need the explicit type
                                                                new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
-                               CountTrigger.of(WINDOW_SLIDE),
-                               CountEvictor.of(WINDOW_SIZE),
+                               CountTrigger.of(windowSlide),
+                               CountEvictor.of(windowSize),
                                0,
                                null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -580,8 +554,6 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
@@ -604,12 +576,11 @@ public class EvictingWindowOperatorTest {
        public void testCountTriggerWithApply() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
 
-               final int WINDOW_SIZE = 4;
-               final int WINDOW_SLIDE = 2;
+               final int windowSize = 4;
+               final int windowSlide = 2;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
-
                @SuppressWarnings({"unchecked", "rawtypes"})
                TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
                                (TypeSerializer<StreamRecord<Tuple2<String, 
Integer>>>) new StreamElementSerializer(inputType.createSerializer(new 
ExecutionConfig()));
@@ -617,7 +588,6 @@ public class EvictingWindowOperatorTest {
                ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
                                new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
-
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                        GlobalWindows.create(),
                        new GlobalWindow.Serializer(),
@@ -625,12 +595,11 @@ public class EvictingWindowOperatorTest {
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
-                       CountTrigger.of(WINDOW_SLIDE),
-                       CountEvictor.of(WINDOW_SIZE),
+                       CountTrigger.of(windowSlide),
+                       CountEvictor.of(windowSize),
                        0,
                        null /* late data output tag */);
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -653,8 +622,6 @@ public class EvictingWindowOperatorTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
 
-
-
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
@@ -679,7 +646,7 @@ public class EvictingWindowOperatorTest {
        public void testTumblingWindowWithApply() throws Exception {
                AtomicInteger closeCalled = new AtomicInteger(0);
 
-               final int WINDOW_SIZE = 4;
+               final int windowSize = 4;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -691,14 +658,14 @@ public class EvictingWindowOperatorTest {
                                new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
 
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
-                       TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                       TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                        new TimeWindow.Serializer(),
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>(closeCalled)),
                        EventTimeTrigger.create(),
-                       CountEvictor.of(WINDOW_SIZE),
+                       CountEvictor.of(windowSize),
                        0,
                        null /* late data output tag */);
 
@@ -731,7 +698,6 @@ public class EvictingWindowOperatorTest {
                expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
3999));
                expectedOutput.add(new Watermark(3999));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(),
                        new EvictingWindowOperatorTest.ResultSortComparator());
                testHarness.close();
@@ -741,10 +707,9 @@ public class EvictingWindowOperatorTest {
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+       private static class SumReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
-
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
                                Tuple2<String, Integer> value2) throws 
Exception {
@@ -752,7 +717,7 @@ public class EvictingWindowOperatorTest {
                }
        }
 
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
+       private static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
index 37fad7e..9292bfb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,15 +24,18 @@ import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link GlobalWindows}
+ * Tests for {@link GlobalWindows}.
  */
 public class GlobalWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
index 8786c4e..8eb26ee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapPutIfAbsentTest {
 
        @Test
@@ -38,7 +41,7 @@ public class KeyMapPutIfAbsentTest {
                                factory.set(2 * i + 1);
                                map.putIfAbsent(i, factory);
 
-                               assertEquals(i+1, map.size());
+                               assertEquals(i + 1, map.size());
                                assertTrue(map.getCurrentTableCapacity() > 
map.size());
                                assertTrue(map.getCurrentTableCapacity() > 
map.getRehashThreshold());
                                assertTrue(map.size() <= 
map.getRehashThreshold());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
index 5b59bea..522c691 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
@@ -29,6 +29,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapPutTest {
 
        @Test
@@ -41,7 +44,7 @@ public class KeyMapPutTest {
                        for (int i = 0; i < numElements; i++) {
                                map.put(i, 2 * i + 1);
 
-                               assertEquals(i+1, map.size());
+                               assertEquals(i + 1, map.size());
                                assertTrue(map.getCurrentTableCapacity() > 
map.size());
                                assertTrue(map.getCurrentTableCapacity() > 
map.getRehashThreshold());
                                assertTrue(map.size() <= 
map.getRehashThreshold());
@@ -72,7 +75,6 @@ public class KeyMapPutTest {
                        assertEquals(numElements, numContained);
                        assertEquals(numElements, bitset.cardinality());
 
-
                        assertEquals(numElements, map.size());
                        assertEquals(numElements, 
map.traverseAndCountElements());
                        assertEquals(1 << 21, map.getCurrentTableCapacity());
@@ -91,18 +93,18 @@ public class KeyMapPutTest {
                        final int numElements = 1000000;
 
                        for (int i = 0; i < numElements; i++) {
-                               Integer put = map.put(i, 2*i+1);
+                               Integer put = map.put(i, 2 * i + 1);
                                assertNull(put);
                        }
 
                        for (int i = 0; i < numElements; i += 3) {
-                               Integer put = map.put(i, 2*i);
+                               Integer put = map.put(i, 2 * i);
                                assertNotNull(put);
-                               assertEquals(2*i+1, put.intValue());
+                               assertEquals(2 * i + 1, put.intValue());
                        }
 
                        for (int i = 0; i < numElements; i++) {
-                               int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
+                               int expected = (i % 3 == 0) ? (2 * i) : (2 * i 
+ 1);
                                assertEquals(expected, map.get(i).intValue());
                        }
 
@@ -111,14 +113,13 @@ public class KeyMapPutTest {
                        assertEquals(1 << 21, map.getCurrentTableCapacity());
                        assertTrue(map.getLongestChainLength() <= 7);
 
-
                        BitSet bitset = new BitSet();
                        int numContained = 0;
                        for (KeyMap.Entry<Integer, Integer> entry : map) {
                                numContained++;
 
                                int key = entry.getKey();
-                               int expected = key % 3 == 0 ? (2*key) : 
(2*key+1);
+                               int expected = key % 3 == 0 ? (2 * key) : (2 * 
key + 1);
 
                                assertEquals(expected, 
entry.getValue().intValue());
                                assertFalse(bitset.get(key));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
index c7fb108..a442466 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
@@ -24,8 +24,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KeyMap}.
+ */
 public class KeyMapTest {
 
        @Test
@@ -218,7 +225,6 @@ public class KeyMapTest {
                                nextKeyValue += keyRnd.nextInt(maxStride) + 1;
                        }
 
-
                        // check that all maps contain the total number of 
elements
                        int numContained = 0;
                        for (KeyMap<?, ?> map : maps) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index aa9cb91..0c45d03 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,11 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -44,9 +45,16 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for verifying that {@link MergingWindowSet} correctly merges windows 
in various situations
@@ -152,7 +160,6 @@ public class MergingWindowSetTest {
                assertEquals(new TimeWindow(11, 14), windowSet.addWindow(new 
TimeWindow(11, 14), mergeFunction));
                assertFalse(mergeFunction.hasMerged());
 
-
                assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
 
                assertEquals(new TimeWindow(11, 14), 
windowSet.getStateWindow(new TimeWindow(11, 14)));
@@ -179,7 +186,6 @@ public class MergingWindowSetTest {
                assertEquals(new TimeWindow(10, 15), windowSet.addWindow(new 
TimeWindow(11, 14), mergeFunction));
                assertFalse(mergeFunction.hasMerged());
 
-
                assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
 
                assertEquals(new TimeWindow(11, 14), 
windowSet.getStateWindow(new TimeWindow(10, 15)));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
index 461b5fc..ceda3b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -36,13 +37,23 @@ import java.util.Collection;
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link ProcessingTimeSessionWindows}
+ * Tests for {@link ProcessingTimeSessionWindows}.
  */
 public class ProcessingTimeSessionWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
index a0c2347..791eb42 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
index 4302d4d..7139a44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -22,6 +23,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
@@ -35,7 +37,12 @@ import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOpera
 import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link PurgingTrigger}.
@@ -51,7 +58,7 @@ public class PurgingTriggerTest {
                for (Method triggerMethod : Trigger.class.getDeclaredMethods()) 
{
 
                        // try retrieving the method, this will throw an 
exception if we can't find it
-                       
PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes());
+                       
PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(), 
triggerMethod.getParameterTypes());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index ff1cbdf..62c484d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -15,19 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.Arrays;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -50,11 +40,24 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * These tests verify that {@link WindowOperator} correctly interacts with the 
other windowing
  * components: {@link WindowAssigner},
@@ -72,7 +75,6 @@ public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTes
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
                InternalWindowFunction<Integer, Void, Integer, TimeWindow> 
mockWindowFunction = mockWindowFunction();
 
-
                ReducingStateDescriptor<Integer> intReduceSumDescriptor =
                                new ReducingStateDescriptor<>(
                                                "int-reduce",
@@ -89,7 +91,6 @@ public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTes
                final ValueStateDescriptor<String> valueStateDescriptor =
                                new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE);
 
-
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
                                createWindowOperator(mockAssigner, mockTrigger, 
0L, intReduceSumDescriptor, mockWindowFunction);
 
@@ -252,7 +253,6 @@ public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTes
                ListStateDescriptor<Integer> intListDescriptor =
                                new ListStateDescriptor<>("int-list", 
IntSerializer.INSTANCE);
 
-
                @SuppressWarnings("unchecked")
                WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> 
operator = new WindowOperator<>(
                                assigner,

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
index 050178b..33f4747 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SimpleTriggerTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 4599d19..95a8314 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link SlidingEventTimeWindows}
+ * Tests for {@link SlidingEventTimeWindows}.
  */
 public class SlidingEventTimeWindowsTest extends TestLogger {
 
@@ -148,7 +151,6 @@ public class SlidingEventTimeWindowsTest extends TestLogger 
{
                        assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
                }
 
-
                try {
                        SlidingEventTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-1));
                        fail("should fail");

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 20a9924..69b628a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link SlidingProcessingTimeWindows}
+ * Tests for {@link SlidingProcessingTimeWindows}.
  */
 public class SlidingProcessingTimeWindowsTest extends TestLogger {
 
@@ -157,7 +161,6 @@ public class SlidingProcessingTimeWindowsTest extends 
TestLogger {
                        assertThat(e.toString(), containsString("0 <= offset < 
slide and size > 0"));
                }
 
-
                try {
                        SlidingProcessingTimeWindows.of(Time.seconds(20), 
Time.seconds(10), Time.seconds(-1));
                        fail("should fail");

Reply via email to