http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
index bb07996..ada4d6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/StreamRecordMatchers.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -30,150 +32,146 @@ import org.hamcrest.TypeSafeMatcher;
  */
 public class StreamRecordMatchers {
 
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-          T value) {
-
-    return isStreamRecord(Matchers.equalTo(value));
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-      T value,
-      long timestamp) {
-
-    return isStreamRecord(Matchers.equalTo(value), 
Matchers.equalTo(timestamp));
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-          Matcher<? super T> valueMatcher) {
-    return new StreamRecordMatcher<>(valueMatcher, Matchers.anything());
-  }
-
-  public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
-      Matcher<? super T> valueMatcher, Matcher<? super Long> timestampMatcher) 
{
-    return new StreamRecordMatcher<>(valueMatcher, timestampMatcher);
-  }
-
-  public static Matcher<TimeWindow> timeWindow(long start, long end) {
-    return Matchers.equalTo(new TimeWindow(start, end));
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @SafeVarargs
-  public static <W extends Window> Matcher<Iterable<W>> 
ofWindows(Matcher<W>... windows) {
-    return (Matcher) Matchers.containsInAnyOrder(windows);
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value) {
-    return isWindowedValue(Matchers.equalTo(value));
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value,
-          long timestamp) {
-    return isWindowedValue(Matchers.equalTo(value), 
Matchers.equalTo(timestamp));
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          T value,
-          long timestamp,
-          W window) {
-    return isWindowedValue(Matchers.equalTo(value), 
Matchers.equalTo(timestamp), Matchers.equalTo(window));
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp) {
-    return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), Matchers.anything());
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp, W window) {
-    return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), Matchers.equalTo(window));
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), 
Matchers.anything());
-  }
-
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, Matcher<? super Long> 
timestampMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, 
Matchers.anything());
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, long timestamp, Matcher<? super W> 
windowMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), windowMatcher);
-  }
-
-  public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
-          Matcher<? super T> valueMatcher, Matcher<? super Long> 
timestampMatcher, Matcher<? super W> windowMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, 
windowMatcher);
-  }
-
-
-  private StreamRecordMatchers() {}
-
-  private static class StreamRecordMatcher<T> extends 
TypeSafeMatcher<StreamRecord<? extends T>> {
-
-    private Matcher<? super T> valueMatcher;
-    private Matcher<? super Long> timestampMatcher;
-
-    private StreamRecordMatcher(
-        Matcher<? super T> valueMatcher,
-        Matcher<? super Long> timestampMatcher) {
-      this.valueMatcher = valueMatcher;
-      this.timestampMatcher = timestampMatcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-          .appendText("a StreamRecordValue(").appendValue(valueMatcher)
-          .appendText(", ").appendValue(timestampMatcher)
-          .appendText(")");
-    }
-
-    @Override
-    protected boolean matchesSafely(StreamRecord<? extends T> streamRecord) {
-      return valueMatcher.matches(streamRecord.getValue())
-              && timestampMatcher.matches(streamRecord.getTimestamp());
-    }
-  }
-
-  private static class WindowedValueMatcher<T, W extends Window> extends 
TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends 
W>>> {
-
-    private Matcher<? super T> valueMatcher;
-    private Matcher<? super Long> timestampMatcher;
-    private Matcher<? super W> windowMatcher;
-
-
-    private WindowedValueMatcher(
-            Matcher<? super T> valueMatcher,
-            Matcher<? super Long> timestampMatcher,
-            Matcher<? super W> windowMatcher) {
-      this.valueMatcher = valueMatcher;
-      this.timestampMatcher = timestampMatcher;
-      this.windowMatcher = windowMatcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-              .appendText("a WindowedValue(").appendValue(valueMatcher)
-              .appendText(", ").appendValue(timestampMatcher)
-              .appendText(", ").appendValue(timestampMatcher)
-              .appendText(")");
-    }
-
-    @Override
-    protected boolean matchesSafely(StreamRecord<? extends WindowedValue<? 
extends T, ? extends W>> streamRecord) {
-      return valueMatcher.matches(streamRecord.getValue().value())
-              && timestampMatcher.matches(streamRecord.getTimestamp())
-              && windowMatcher.matches(streamRecord.getValue().window());
-    }
-  }
+       public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+               T value) {
+
+               return isStreamRecord(Matchers.equalTo(value));
+       }
+
+       public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+               T value,
+               long timestamp) {
+
+               return isStreamRecord(Matchers.equalTo(value), 
Matchers.equalTo(timestamp));
+       }
+
+       public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+               Matcher<? super T> valueMatcher) {
+               return new StreamRecordMatcher<>(valueMatcher, 
Matchers.anything());
+       }
+
+       public static <T> Matcher<StreamRecord<? extends T>> isStreamRecord(
+               Matcher<? super T> valueMatcher, Matcher<? super Long> 
timestampMatcher) {
+               return new StreamRecordMatcher<>(valueMatcher, 
timestampMatcher);
+       }
+
+       public static Matcher<TimeWindow> timeWindow(long start, long end) {
+               return Matchers.equalTo(new TimeWindow(start, end));
+       }
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       @SafeVarargs
+       public static <W extends Window> Matcher<Iterable<W>> 
ofWindows(Matcher<W>... windows) {
+               return (Matcher) Matchers.containsInAnyOrder(windows);
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               T value) {
+               return isWindowedValue(Matchers.equalTo(value));
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               T value,
+               long timestamp) {
+               return isWindowedValue(Matchers.equalTo(value), 
Matchers.equalTo(timestamp));
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               T value,
+               long timestamp,
+               W window) {
+               return isWindowedValue(Matchers.equalTo(value), 
Matchers.equalTo(timestamp), Matchers.equalTo(window));
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher, long timestamp) {
+               return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), Matchers.anything());
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher, long timestamp, W window) {
+               return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), Matchers.equalTo(window));
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher) {
+               return new WindowedValueMatcher<>(valueMatcher, 
Matchers.anything(), Matchers.anything());
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher, Matcher<? super Long> 
timestampMatcher) {
+               return new WindowedValueMatcher<>(valueMatcher, 
timestampMatcher, Matchers.anything());
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher, long timestamp, Matcher<? 
super W> windowMatcher) {
+               return new WindowedValueMatcher<>(valueMatcher, 
Matchers.equalTo(timestamp), windowMatcher);
+       }
+
+       public static <T, W extends Window> Matcher<StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>>> isWindowedValue(
+               Matcher<? super T> valueMatcher, Matcher<? super Long> 
timestampMatcher, Matcher<? super W> windowMatcher) {
+               return new WindowedValueMatcher<>(valueMatcher, 
timestampMatcher, windowMatcher);
+       }
+
+       private StreamRecordMatchers() {
+       }
+
+       private static class StreamRecordMatcher<T> extends 
TypeSafeMatcher<StreamRecord<? extends T>> {
+
+               private Matcher<? super T> valueMatcher;
+               private Matcher<? super Long> timestampMatcher;
+
+               private StreamRecordMatcher(
+                       Matcher<? super T> valueMatcher,
+                       Matcher<? super Long> timestampMatcher) {
+                       this.valueMatcher = valueMatcher;
+                       this.timestampMatcher = timestampMatcher;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description
+                               .appendText("a 
StreamRecordValue(").appendValue(valueMatcher)
+                               .appendText(", ").appendValue(timestampMatcher)
+                               .appendText(")");
+               }
+
+               @Override
+               protected boolean matchesSafely(StreamRecord<? extends T> 
streamRecord) {
+                       return valueMatcher.matches(streamRecord.getValue())
+                               && 
timestampMatcher.matches(streamRecord.getTimestamp());
+               }
+       }
+
+       private static class WindowedValueMatcher<T, W extends Window> extends 
TypeSafeMatcher<StreamRecord<? extends WindowedValue<? extends T, ? extends 
W>>> {
+
+               private Matcher<? super T> valueMatcher;
+               private Matcher<? super Long> timestampMatcher;
+               private Matcher<? super W> windowMatcher;
+
+               private WindowedValueMatcher(
+                       Matcher<? super T> valueMatcher,
+                       Matcher<? super Long> timestampMatcher,
+                       Matcher<? super W> windowMatcher) {
+                       this.valueMatcher = valueMatcher;
+                       this.timestampMatcher = timestampMatcher;
+                       this.windowMatcher = windowMatcher;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description
+                               .appendText("a 
WindowedValue(").appendValue(valueMatcher)
+                               .appendText(", ").appendValue(timestampMatcher)
+                               .appendText(", ").appendValue(timestampMatcher)
+                               .appendText(")");
+               }
+
+               @Override
+               protected boolean matchesSafely(StreamRecord<? extends 
WindowedValue<? extends T, ? extends W>> streamRecord) {
+                       return 
valueMatcher.matches(streamRecord.getValue().value())
+                               && 
timestampMatcher.matches(streamRecord.getTimestamp())
+                               && 
windowMatcher.matches(streamRecord.getValue().window());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 341171d..f4e0f1d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -19,41 +19,45 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+/**
+ * Tests for {@link TimeWindow}.
+ */
 public class TimeWindowTest {
        @Test
        public void testGetWindowStartWithOffset() {
-               //[0,7),[7,14),[14,21)...
+               // [0, 7), [7, 14), [14, 21)...
                long offset = 0;
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, 
offset, 7), 0);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, 
offset, 7), 0);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, 
offset, 7), 7);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, 
offset, 7), 7);
 
-               //[-4,3),[3,10),[10,17)...
+               // [-4, 3), [3, 10), [10, 17)...
                offset = 3;
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, 
offset, 7), -4);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, 
offset, 7), -4);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, 
offset, 7), 3);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, 
offset, 7), 3);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, 
offset, 7), 10);
 
-               //[-2,5),[5,12),[12,19)...
+               // [-2, 5), [5, 12), [12, 19)...
                offset = -2;
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, 
offset, 7), -2);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, 
offset, 7), -2);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, 
offset, 7), -2);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4, 
offset, 7), -2);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, 
offset, 7), 5);
+               Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12, 
offset, 7), 12);
 
                // for GMT+8:00
-               offset = - TimeUnit.HOURS.toMillis(8);
+               offset = -TimeUnit.HOURS.toMillis(8);
                long size = TimeUnit.DAYS.toMillis(1);
-               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
+               
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450L, offset, 
size), 1470844800000L);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index df65ca2..dc0e21c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -28,8 +29,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
@@ -161,7 +162,7 @@ public class TimeWindowTranslationTest {
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
-                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
                                .reduce(new DummyReducer());
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
@@ -185,7 +186,7 @@ public class TimeWindowTranslationTest {
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
-                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
                                .fold(new Tuple2<>("", 1), new DummyFolder());
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
@@ -235,7 +236,7 @@ public class TimeWindowTranslationTest {
         * These tests ensure that the fast aligned time windows operator is 
used if the
         * conditions are right.
         *
-        * TODO: update once the fast aligned time windows operator is in
+        * <p>TODO: update once the fast aligned time windows operator is in
         */
        @Ignore
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 4267444..5aa47e8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.JobID;
@@ -36,9 +37,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.TestInternalTimerService;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -123,7 +124,7 @@ public class TriggerTestHarness<T, W extends Window> {
 
        /**
         * Injects one element into the trigger for the given window and 
returns the result of
-        * {@link Trigger#onElement(Object, long, Window, 
Trigger.TriggerContext)}
+        * {@link Trigger#onElement(Object, long, Window, 
Trigger.TriggerContext)}.
         */
        public TriggerResult processElement(StreamRecord<T> element, W window) 
throws Exception {
                TestTriggerContext<Integer, W> triggerContext = new 
TestTriggerContext<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 2373a86..9e4669f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,17 +26,21 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link TumblingEventTimeWindows}
+ * Tests for {@link TumblingEventTimeWindows}.
  */
 public class TumblingEventTimeWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index 348b6fa..a611fc0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
+package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,18 +26,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Tests for {@link TumblingProcessingTimeWindows}
+ * Tests for {@link TumblingProcessingTimeWindows}.
  */
 public class TumblingProcessingTimeWindowsTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 0d80605..8ceda45 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -15,38 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
 
-import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -64,6 +38,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -73,6 +48,31 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Base for window operator tests that verify correct interaction with the 
other windowing
  * components: {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
@@ -128,7 +128,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                return mockAssigner;
        }
 
-
        static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() 
throws Exception {
                @SuppressWarnings("unchecked")
                MergingWindowAssigner<T, TimeWindow> mockAssigner = 
mock(MergingWindowAssigner.class);
@@ -139,7 +138,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                return mockAssigner;
        }
 
-
        static WindowAssigner.WindowAssignerContext anyAssignerContext() {
                return Mockito.any();
        }
@@ -177,7 +175,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                return Mockito.any();
        }
 
-
        static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, 
TimeWindow> mockTrigger, final long timestamp) throws Exception {
                doAnswer(new Answer<TriggerResult>() {
                        @Override
@@ -369,7 +366,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
        }
 
-
        @Test
        public void testAssignerIsInvokedOncePerElement() throws Exception {
 
@@ -540,7 +536,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testEmittingFromWindowFunction(new ProcessingTimeAdaptor());
        }
 
-
        private void testEmittingFromWindowFunction(TimeDomainAdaptor 
timeAdaptor) throws Exception {
 
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
@@ -1258,7 +1253,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testTimerFiring(new ProcessingTimeAdaptor());
        }
 
-
        private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws 
Exception {
 
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
@@ -1382,8 +1376,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new 
TimeWindow(2, 4))), anyMergeCallback());
 
                verify(mockAssigner, times(2)).mergeWindows(anyCollection(), 
anyMergeCallback());
-
-
        }
 
        @Test
@@ -2392,7 +2384,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+                               InternalWindowFunction.InternalWindowContext 
context = (InternalWindowFunction.InternalWindowContext) 
invocationOnMock.getArguments()[2];
                                
context.windowState().getState(valueStateDescriptor).update("hello");
                                return null;
                        }
@@ -2401,7 +2393,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+                               InternalWindowFunction.InternalWindowContext 
context = (InternalWindowFunction.InternalWindowContext) 
invocationOnMock.getArguments()[1];
                                
context.windowState().getState(valueStateDescriptor).clear();
                                return null;
                        }
@@ -2441,7 +2433,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+                               InternalWindowFunction.InternalWindowContext 
context = (InternalWindowFunction.InternalWindowContext) 
invocationOnMock.getArguments()[2];
                                
context.windowState().getState(valueStateDescriptor).update("hello");
                                return null;
                        }
@@ -2481,7 +2473,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+                               InternalWindowFunction.InternalWindowContext 
context = (InternalWindowFunction.InternalWindowContext) 
invocationOnMock.getArguments()[2];
                                timeAdaptor.verifyCorrectTime(testHarness, 
context);
                                return null;
                        }
@@ -2490,7 +2482,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+                               InternalWindowFunction.InternalWindowContext 
context = (InternalWindowFunction.InternalWindowContext) 
invocationOnMock.getArguments()[1];
                                timeAdaptor.verifyCorrectTime(testHarness, 
context);
                                return null;
                        }
@@ -2520,7 +2512,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                        long allowedLatenss,
                        InternalWindowFunction<Iterable<Integer>, OUT, Integer, 
W> windowFunction) throws Exception;
 
-
        private interface TimeDomainAdaptor {
 
                void setIsEventTime(WindowAssigner<?, ?> mockAssigner);
@@ -2535,9 +2526,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                int numTimersOtherDomain(AbstractStreamOperatorTestHarness 
testHarness);
 
-               void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> 
mockTrigger, final long timestamp) throws Exception;
+               void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> 
mockTrigger, long timestamp) throws Exception;
 
-               void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> 
mockTrigger, final long timestamp) throws Exception;
+               void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> 
mockTrigger, long timestamp) throws Exception;
 
                void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) 
throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
index 9ec1923..904a8b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -50,6 +51,7 @@ import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.net.URL;
@@ -63,8 +65,7 @@ import static org.junit.Assert.fail;
  * Tests for checking whether {@link WindowOperator} can restore from 
snapshots that were done
  * using the Flink 1.1 {@link WindowOperator}.
  *
- * <p>
- * This also checks whether {@link WindowOperator} can restore from a 
checkpoint of the Flink 1.1
+ * <p>This also checks whether {@link WindowOperator} can restore from a 
checkpoint of the Flink 1.1
  * aligned processing-time windows operator.
  *
  * <p>For regenerating the binary snapshot file you have to run the commented 
out portion
@@ -85,7 +86,7 @@ public class WindowOperatorFrom11MigrationTest {
        @SuppressWarnings("unchecked")
        public void testRestoreSessionWindowsWithCountTriggerFromFlink11() 
throws Exception {
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -93,7 +94,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -131,7 +132,6 @@ public class WindowOperatorFrom11MigrationTest {
                testHarness.close();
         */
 
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -146,7 +146,6 @@ public class WindowOperatorFrom11MigrationTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 6500));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 7000));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
 
                // add an element that merges the two "key1" sessions, they 
should now have count 6, and therfore fire
@@ -167,7 +166,7 @@ public class WindowOperatorFrom11MigrationTest {
        @SuppressWarnings("unchecked")
        public void 
testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws 
Exception {
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -175,7 +174,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -244,7 +243,7 @@ public class WindowOperatorFrom11MigrationTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testRestoreReducingEventTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -253,7 +252,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -334,7 +333,7 @@ public class WindowOperatorFrom11MigrationTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testRestoreApplyEventTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -342,7 +341,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -423,7 +422,7 @@ public class WindowOperatorFrom11MigrationTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testRestoreReducingProcessingTimeWindowsFromFlink11() 
throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -432,7 +431,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +499,7 @@ public class WindowOperatorFrom11MigrationTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -508,7 +507,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -622,7 +621,7 @@ public class WindowOperatorFrom11MigrationTest {
 
                */
 
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -631,7 +630,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -725,7 +724,7 @@ public class WindowOperatorFrom11MigrationTest {
                testHarness.close();
 
                */
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -734,7 +733,7 @@ public class WindowOperatorFrom11MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -774,7 +773,6 @@ public class WindowOperatorFrom11MigrationTest {
                testHarness.close();
        }
 
-
        private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
                private static final long serialVersionUID = 1L;
 
@@ -832,7 +830,7 @@ public class WindowOperatorFrom11MigrationTest {
                }
        }
 
-       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+       private static class SumReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
@@ -841,7 +839,7 @@ public class WindowOperatorFrom11MigrationTest {
                }
        }
 
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
+       private static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
@@ -877,7 +875,7 @@ public class WindowOperatorFrom11MigrationTest {
 
        }
 
-       public static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
+       private static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
index 0d3a6dc..6e9db1a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.operators.windowing;
 
-import static org.junit.Assert.fail;
+package org.apache.flink.streaming.runtime.operators.windowing;
 
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -57,9 +53,16 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
 /**
  * Tests for checking whether {@link WindowOperator} can restore from 
snapshots that were done
  * using the Flink 1.2 {@link WindowOperator}.
@@ -78,7 +81,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Ignore
        @Test
        public void writeSessionWindowsWithCountTriggerSnapshot() throws 
Exception {
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -86,7 +89,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -122,7 +125,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Test
        public void testRestoreSessionWindowsWithCountTrigger() throws 
Exception {
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -130,7 +133,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -157,7 +160,6 @@ public class WindowOperatorFrom12MigrationTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 6500));
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 7000));
 
-
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
 
                // add an element that merges the two "key1" sessions, they 
should now have count 6, and therfore fire
@@ -177,7 +179,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Test
        public void 
writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception {
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -185,7 +187,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -215,7 +217,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Test
        public void testRestoreSessionWindowsWithCountTriggerInMintCondition() 
throws Exception {
 
-               final int SESSION_SIZE = 3;
+               final int sessionSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -223,7 +225,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               
EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -280,7 +282,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Ignore
        @Test
        public void writeReducingEventTimeWindowsSnapshot() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -289,7 +291,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -336,7 +338,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreReducingEventTimeWindows() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -345,7 +347,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -391,7 +393,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Ignore
        @Test
        public void writeApplyEventTimeWindowsSnapshot() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -399,7 +401,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -446,7 +448,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreApplyEventTimeWindows() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -454,7 +456,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               TumblingEventTimeWindows.of(Time.of(windowSize, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -500,7 +502,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Ignore
        @Test
        public void writeReducingProcessingTimeWindowsSnapshot() throws 
Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -509,7 +511,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -550,7 +552,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreReducingProcessingTimeWindows() throws Exception 
{
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -559,7 +561,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -600,7 +602,7 @@ public class WindowOperatorFrom12MigrationTest {
        @Ignore
        @Test
        public void writeApplyProcessingTimeWindowsSnapshot() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -608,7 +610,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -648,7 +650,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreApplyProcessingTimeWindows() throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -656,7 +658,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -738,7 +740,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreAggregatingAlignedProcessingTimeWindows() throws 
Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -747,7 +749,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -841,7 +843,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        @Test
        public void testRestoreAccumulatingAlignedProcessingTimeWindows() 
throws Exception {
-               final int WINDOW_SIZE = 3;
+               final int windowSize = 3;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -850,7 +852,7 @@ public class WindowOperatorFrom12MigrationTest {
                                inputType.createSerializer(new 
ExecutionConfig()));
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -892,7 +894,6 @@ public class WindowOperatorFrom12MigrationTest {
                testHarness.close();
        }
 
-
        private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
                private static final long serialVersionUID = 1L;
 
@@ -950,7 +951,7 @@ public class WindowOperatorFrom12MigrationTest {
                }
        }
 
-       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+       private static class SumReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
@@ -959,7 +960,7 @@ public class WindowOperatorFrom12MigrationTest {
                }
        }
 
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
+       private static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
@@ -995,7 +996,7 @@ public class WindowOperatorFrom12MigrationTest {
 
        }
 
-       public static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
+       private static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override

Reply via email to