[FLINK-4552] Refactor WindowOperator/Trigger Tests

Before, tests for WindowOperator, WindowAssigner, Trigger and
WindowFunction were all conflated in WindowOperatorTest. All of these
test that a certain combination of a Trigger, WindowAssigner and
WindowFunction produce the expected output.

This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.

The new WindowOperatorContractTest verifies that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These
tests also verify that the different state types and WindowFunctions
work correctly.

For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and
timers.

WindowAssigner tests verify the behaviour of window assigners in
isolation.  They also test, for example, whether offset parameter of
time-based windows work correctly.

We keep the old WindowOperatorTest because it still provides some level
of coverage and doesn't take long to run.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1475ee8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1475ee8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1475ee8

Branch: refs/heads/master
Commit: d1475ee86fb58ab70a6dc785d08f190189ede43d
Parents: 0b331a4
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Sep 5 12:01:11 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Jan 24 10:42:34 2017 +0100

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       |   45 +
 .../runtime/state/MemoryStateBackendTest.java   |   43 +
 .../assigners/EventTimeSessionWindows.java      |    4 +
 .../api/windowing/assigners/GlobalWindows.java  |    4 +-
 .../assigners/ProcessingTimeSessionWindows.java |    4 +
 .../assigners/SlidingEventTimeWindows.java      |   28 +-
 .../assigners/SlidingProcessingTimeWindows.java |   30 +-
 .../assigners/TumblingEventTimeWindows.java     |   36 +-
 .../TumblingProcessingTimeWindows.java          |   32 +-
 .../tasks/TestProcessingTimeService.java        |    2 +-
 .../api/operators/TestInternalTimerService.java |  238 ++
 .../TestProcessingTimeServiceTest.java          |    2 +-
 .../operators/windowing/CountTriggerTest.java   |  166 ++
 .../windowing/EventTimeSessionWindowsTest.java  |  179 ++
 .../windowing/EventTimeTriggerTest.java         |  153 ++
 .../operators/windowing/GlobalWindowsTest.java  |   59 +
 .../windowing/MergingWindowSetTest.java         |   25 +
 .../ProcessingTimeSessionWindowsTest.java       |  184 ++
 .../windowing/ProcessingTimeTriggerTest.java    |  134 +
 .../operators/windowing/PurgingTriggerTest.java |  149 +
 .../windowing/SimpleTriggerTestHarness.java     |   41 +
 .../windowing/SlidingEventTimeWindowsTest.java  |  168 ++
 .../SlidingProcessingTimeWindowsTest.java       |  177 ++
 .../windowing/StreamRecordMatchers.java         |  179 ++
 .../operators/windowing/TriggerTestHarness.java |  381 +++
 .../windowing/TumblingEventTimeWindowsTest.java |  113 +
 .../TumblingProcessingTimeWindowsTest.java      |  129 +
 .../windowing/WindowOperatorContractTest.java   | 2572 ++++++++++++++++++
 .../operators/windowing/WindowOperatorTest.java |  171 --
 .../operators/windowing/WindowedValue.java      |   47 +
 .../windowing/WindowingTestHarnessTest.java     |  230 --
 .../util/AbstractStreamOperatorTestHarness.java |   20 +
 .../KeyedOneInputStreamOperatorTestHarness.java |   17 +
 .../flink/streaming/util/TestHarnessUtil.java   |   15 +-
 .../streaming/util/WindowingTestHarness.java    |  221 --
 35 files changed, 5298 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index b4c2b8b..0fe92e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
 
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -534,4 +535,48 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        return stateSerializer;
                }
        }
+
+       /**
+        * Returns the total number of state entries across all keys/namespaces.
+        */
+       @VisibleForTesting
+       @SuppressWarnings("unchecked")
+       public int numStateEntries() {
+               int sum = 0;
+               for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+                       for (Map namespaceMap : stateTable.getState()) {
+                               if (namespaceMap == null) {
+                                       continue;
+                               }
+                               Map<?, Map> typedMap = (Map<?, Map>) 
namespaceMap;
+                               for (Map entriesMap : typedMap.values()) {
+                                       sum += entriesMap.size();
+                               }
+                       }
+               }
+               return sum;
+       }
+
+       /**
+        * Returns the total number of state entries across all keys for the 
given namespace.
+        */
+       @VisibleForTesting
+       @SuppressWarnings("unchecked")
+       public <N> int numStateEntries(N namespace) {
+               int sum = 0;
+               for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+                       for (Map namespaceMap : stateTable.getState()) {
+                               if (namespaceMap == null) {
+                                       continue;
+                               }
+                               Map<?, Map> typedMap = (Map<?, Map>) 
namespaceMap;
+                               Map singleNamespace = typedMap.get(namespace);
+                               if (singleNamespace != null) {
+                                       sum += singleNamespace.size();
+                               }
+                       }
+               }
+               return sum;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index ac6adff..c267afc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 
@@ -56,6 +61,44 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
        public void testReducingStateRestoreWithWrongSerializers() {}
 
        @Test
+       @SuppressWarnings("unchecked")
+       public void testNumStateEntries() throws Exception {
+               KeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               HeapKeyedStateBackend<Integer> heapBackend = 
(HeapKeyedStateBackend<Integer>) backend;
+
+               assertEquals(0, heapBackend.numStateEntries());
+
+               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(0);
+               state.update("hello");
+               state.update("ciao");
+
+               assertEquals(1, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(42);
+               state.update("foo");
+
+               assertEquals(2, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(0);
+               state.clear();
+
+               assertEquals(1, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(42);
+               state.clear();
+
+               assertEquals(0, heapBackend.numStateEntries());
+
+               backend.dispose();
+       }
+
+       @Test
        public void testOversizedState() {
                try {
                        MemoryStateBackend backend = new MemoryStateBackend(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index e38f617..1703f6c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -47,6 +47,10 @@ public class EventTimeSessionWindows extends 
MergingWindowAssigner<Object, TimeW
        protected long sessionTimeout;
 
        protected EventTimeSessionWindows(long sessionTimeout) {
+               if (sessionTimeout <= 0) {
+                       throw new 
IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < 
size");
+               }
+
                this.sessionTimeout = sessionTimeout;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 7ea3158..9e3846d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +71,8 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
        /**
         * A trigger that never fires, as default Trigger for GlobalWindows.
         */
-       private static class NeverTrigger extends Trigger<Object, GlobalWindow> 
{
+       @Internal
+       public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 52d1c03..02c680e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -47,6 +47,10 @@ public class ProcessingTimeSessionWindows extends 
MergingWindowAssigner<Object,
        protected long sessionTimeout;
 
        protected ProcessingTimeSessionWindows(long sessionTimeout) {
+               if (sessionTimeout <= 0) {
+                       throw new 
IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 
0 < size");
+               }
+
                this.sessionTimeout = sessionTimeout;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 16171a0..ef6ed56 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -55,6 +55,10 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
        private final long offset;
 
        protected SlidingEventTimeWindows(long size, long slide, long offset) {
+               if (offset < 0 || offset >= slide || size <= 0) {
+                       throw new 
IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= 
offset < slide and size > 0");
+               }
+
                this.size = size;
                this.slide = slide;
                this.offset = offset;
@@ -109,20 +113,18 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
        }
 
        /**
-        *  Creates a new {@code SlidingEventTimeWindows} {@link 
WindowAssigner} that assigns
-        *  elements to time windows based on the element timestamp and offset.
-        *<p>
-        *     For example, if you want window a stream by hour,but window 
begins at the 15th minutes
-        *     of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
-        *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-        *</p>
+        * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} 
that assigns
+        * elements to time windows based on the element timestamp and offset.
+        *
+        * <p>For example, if you want window a stream by hour,but window 
begins at the 15th minutes
+        * of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
+        * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+        *
+        * <p>Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
+        * such as China which is using UTC+08:00,and you want a time window 
with size of one day,
+        * and window begins at every 00:00:00 of local time,you may use {@code 
of(Time.days(1),Time.hours(-8))}.
+        * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 
is 8 hours earlier than UTC time.
         *
-        * <p>
-        *     Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
-        *     such as China which is using UTC+08:00,and you want a time 
window with size of one day,
-        *     and window begins at every 00:00:00 of local time,you may use 
{@code of(Time.days(1),Time.hours(-8))}.
-        *     The parameter of offset is {@code Time.hours(-8))} since 
UTC+08:00 is 8 hours earlier than UTC time.
-        * </p>
         * @param size The size of the generated windows.
         * @param slide  The slide interval of the generated windows.
         * @param offset The offset which window start would be shifted by.

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index e03467f..c11045d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -51,7 +51,11 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
 
        private final long slide;
 
-       private SlidingProcessingTimeWindows(long size, long slide, long 
offset){
+       private SlidingProcessingTimeWindows(long size, long slide, long 
offset) {
+               if (offset < 0 || offset >= slide || size <= 0) {
+                       throw new 
IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 
0 <= offset < slide and size > 0");
+               }
+
                this.size = size;
                this.slide = slide;
                this.offset = offset;
@@ -101,20 +105,18 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
        }
 
        /**
-        *  Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
-        *  elements to time windows based on the element timestamp and offset.
-        *<p>
-        *     For example, if you want window a stream by hour,but window 
begins at the 15th minutes
-        *     of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
-        *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-        *</p>
+        * Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+        * elements to time windows based on the element timestamp and offset.
+        *
+        * <p>For example, if you want window a stream by hour,but window 
begins at the 15th minutes
+        * of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
+        * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+        *
+        * <p>Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
+        * such as China which is using UTC+08:00,and you want a time window 
with size of one day,
+        * and window begins at every 00:00:00 of local time,you may use {@code 
of(Time.days(1),Time.hours(-8))}.
+        * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 
is 8 hours earlier than UTC time.
         *
-        * <p>
-        *     Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
-        *     such as China which is using UTC+08:00,and you want a time 
window with size of one day,
-        *     and window begins at every 00:00:00 of local time,you may use 
{@code of(Time.days(1),Time.hours(-8))}.
-        *     The parameter of offset is {@code Time.hours(-8))} since 
UTC+08:00 is 8 hours earlier than UTC time.
-        * </p>
         * @param size The size of the generated windows.
         * @param slide  The slide interval of the generated windows.
         * @param offset The offset which window start would be shifted by.

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index b7fa343..d695a0c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -51,7 +51,11 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
 
        private final long offset;
 
-       protected TumblingEventTimeWindows(long size, long offset){
+       protected TumblingEventTimeWindows(long size, long offset) {
+               if (offset < 0 || offset >= size) {
+                       throw new 
IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= 
offset < size");
+               }
+
                this.size = size;
                this.offset = offset;
        }
@@ -68,10 +72,6 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
                }
        }
 
-       public long getSize() {
-               return size;
-       }
-
        @Override
        public Trigger<Object, TimeWindow> 
getDefaultTrigger(StreamExecutionEnvironment env) {
                return EventTimeTrigger.create();
@@ -94,26 +94,24 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
        }
 
        /**
-        *  Creates a new {@code TumblingEventTimeWindows} {@link 
WindowAssigner} that assigns
-        *  elements to time windows based on the element timestamp and offset.
-        *<p>
-        *     For example, if you want window a stream by hour,but window 
begins at the 15th minutes
-        *     of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
-        *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-        *</p>
+        * Creates a new {@code TumblingEventTimeWindows} {@link 
WindowAssigner} that assigns
+        * elements to time windows based on the element timestamp and offset.
+        *
+        * <p>For example, if you want window a stream by hour,but window 
begins at the 15th minutes
+        * of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
+        * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+        *
+        * <p>Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
+        * such as China which is using UTC+08:00,and you want a time window 
with size of one day,
+        * and window begins at every 00:00:00 of local time,you may use {@code 
of(Time.days(1),Time.hours(-8))}.
+        * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 
is 8 hours earlier than UTC time.
         *
-        * <p>
-        *     Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
-        *     such as China which is using UTC+08:00,and you want a time 
window with size of one day,
-        *     and window begins at every 00:00:00 of local time,you may use 
{@code of(Time.days(1),Time.hours(-8))}.
-        *     The parameter of offset is {@code Time.hours(-8))} since 
UTC+08:00 is 8 hours earlier than UTC time.
-        * </p>
         * @param size The size of the generated windows.
         * @param offset The offset which window start would be shifted by.
         * @return The time policy.
         */
        public static TumblingEventTimeWindows of(Time size, Time offset) {
-               return new TumblingEventTimeWindows(size.toMilliseconds(), 
offset.toMilliseconds() % size.toMilliseconds());
+               return new TumblingEventTimeWindows(size.toMilliseconds(), 
offset.toMilliseconds());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index f4fb620..5b39fe0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -49,7 +49,11 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
        private final long offset;
 
 
-       private TumblingProcessingTimeWindows(long size,long offset) {
+       private TumblingProcessingTimeWindows(long size, long offset) {
+               if (offset < 0 || offset >= size) {
+                       throw new 
IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 
 0 <= offset < size");
+               }
+
                this.size = size;
                this.offset = offset;
        }
@@ -87,26 +91,24 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
        }
 
        /**
-        *  Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
-        *  elements to time windows based on the element timestamp and offset.
-        *<p>
-        *     For example, if you want window a stream by hour,but window 
begins at the 15th minutes
-        *     of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
-        *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
-        *</p>
+        * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+        * elements to time windows based on the element timestamp and offset.
+        *
+        * <p>For example, if you want window a stream by hour,but window 
begins at the 15th minutes
+        * of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
+        * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+        *
+        * <p>Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
+        * such as China which is using UTC+08:00,and you want a time window 
with size of one day,
+        * and window begins at every 00:00:00 of local time,you may use {@code 
of(Time.days(1),Time.hours(-8))}.
+        * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 
is 8 hours earlier than UTC time.
         *
-        * <p>
-        *     Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
-        *     such as China which is using UTC+08:00,and you want a time 
window with size of one day,
-        *     and window begins at every 00:00:00 of local time,you may use 
{@code of(Time.days(1),Time.hours(-8))}.
-        *     The parameter of offset is {@code Time.hours(-8))} since 
UTC+08:00 is 8 hours earlier than UTC time.
-        * </p>
         * @param size The size of the generated windows.
         * @param offset The offset which window start would be shifted by.
         * @return The time policy.
         */
        public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-               return new TumblingProcessingTimeWindows(size.toMilliseconds(), 
offset.toMilliseconds() % size.toMilliseconds());
+               return new TumblingProcessingTimeWindows(size.toMilliseconds(), 
offset.toMilliseconds());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index b4e7e97..e5fcc1a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * */
 public class TestProcessingTimeService extends ProcessingTimeService {
 
-       private volatile long currentTime = 0L;
+       private volatile long currentTime = Long.MIN_VALUE;
 
        private volatile boolean isTerminated;
        private volatile boolean isQuiesced;

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
new file mode 100644
index 0000000..a03a4c5
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Implementation of {@link InternalTimerService} meant to use for testing.
+ */
+@Internal
+public class TestInternalTimerService<K, N> implements InternalTimerService<N> 
{
+
+       private long currentProcessingTime = Long.MIN_VALUE;
+
+       private long currentWatermark = Long.MIN_VALUE;
+
+       private final KeyContext keyContext;
+
+       /**
+        * Processing time timers that are currently in-flight.
+        */
+       private final PriorityQueue<Timer<K, N>> processingTimeTimersQueue;
+       private final Set<Timer<K, N>> processingTimeTimers;
+
+       /**
+        * Current waiting watermark callbacks.
+        */
+       private final Set<Timer<K, N>> watermarkTimers;
+       private final PriorityQueue<Timer<K, N>> watermarkTimersQueue;
+
+       public TestInternalTimerService(KeyContext keyContext) {
+               this.keyContext = keyContext;
+
+               watermarkTimers = new HashSet<>();
+               watermarkTimersQueue = new PriorityQueue<>(100);
+               processingTimeTimers = new HashSet<>();
+               processingTimeTimersQueue = new PriorityQueue<>(100);
+       }
+
+       @Override
+       public long currentProcessingTime() {
+               return currentProcessingTime;
+       }
+
+       @Override
+       public long currentWatermark() {
+               return currentWatermark;
+       }
+
+       @Override
+       public void registerProcessingTimeTimer(N namespace, long time) {
+               @SuppressWarnings("unchecked")
+               Timer<K, N> timer = new Timer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+               // make sure we only put one timer per key into the queue
+               if (processingTimeTimers.add(timer)) {
+                       processingTimeTimersQueue.add(timer);
+               }
+       }
+
+       @Override
+       public void registerEventTimeTimer(N namespace, long time) {
+               @SuppressWarnings("unchecked")
+               Timer<K, N> timer = new Timer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+               if (watermarkTimers.add(timer)) {
+                       watermarkTimersQueue.add(timer);
+               }
+       }
+
+       @Override
+       public void deleteProcessingTimeTimer(N namespace, long time) {
+               @SuppressWarnings("unchecked")
+               Timer<K, N> timer = new Timer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+
+               if (processingTimeTimers.remove(timer)) {
+                       processingTimeTimersQueue.remove(timer);
+               }
+       }
+
+       @Override
+       public void deleteEventTimeTimer(N namespace, long time) {
+               @SuppressWarnings("unchecked")
+               Timer<K, N> timer = new Timer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+               if (watermarkTimers.remove(timer)) {
+                       watermarkTimersQueue.remove(timer);
+               }
+       }
+
+       public Collection<Timer<K, N>> advanceProcessingTime(long time) throws 
Exception {
+               List<Timer<K, N>> result = new ArrayList<>();
+
+               Timer<K, N> timer = processingTimeTimersQueue.peek();
+
+               while (timer != null && timer.timestamp <= time) {
+                       processingTimeTimers.remove(timer);
+                       processingTimeTimersQueue.remove();
+                       result.add(timer);
+                       timer = processingTimeTimersQueue.peek();
+               }
+
+               currentProcessingTime = time;
+               return result;
+       }
+
+       public Collection<Timer<K, N>> advanceWatermark(long time) throws 
Exception {
+               List<Timer<K, N>> result = new ArrayList<>();
+
+               Timer<K, N> timer = watermarkTimersQueue.peek();
+
+               while (timer != null && timer.timestamp <= time) {
+                       watermarkTimers.remove(timer);
+                       watermarkTimersQueue.remove();
+                       result.add(timer);
+                       timer = watermarkTimersQueue.peek();
+               }
+
+               currentWatermark = time;
+               return result;
+       }
+
+       /**
+        * Internal class for keeping track of in-flight timers.
+        */
+       public static class Timer<K, N> implements Comparable<Timer<K, N>> {
+               private final long timestamp;
+               private final K key;
+               private final N namespace;
+
+               public Timer(long timestamp, K key, N namespace) {
+                       this.timestamp = timestamp;
+                       this.key = key;
+                       this.namespace = namespace;
+               }
+
+               public long getTimestamp() {
+                       return timestamp;
+               }
+
+               public K getKey() {
+                       return key;
+               }
+
+               public N getNamespace() {
+                       return namespace;
+               }
+
+               @Override
+               public int compareTo(Timer<K, N> o) {
+                       return Long.compare(this.timestamp, o.timestamp);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()){
+                               return false;
+                       }
+
+                       Timer<?, ?> timer = (Timer<?, ?>) o;
+
+                       return timestamp == timer.timestamp
+                                       && key.equals(timer.key)
+                                       && namespace.equals(timer.namespace);
+
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = (int) (timestamp ^ (timestamp >>> 32));
+                       result = 31 * result + key.hashCode();
+                       result = 31 * result + namespace.hashCode();
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return "Timer{" +
+                                       "timestamp=" + timestamp +
+                                       ", key=" + key +
+                                       ", namespace=" + namespace +
+                                       '}';
+               }
+       }
+
+       public int numProcessingTimeTimers() {
+               return processingTimeTimers.size();
+       }
+
+       public int numEventTimeTimers() {
+               return watermarkTimers.size();
+       }
+
+       public int numProcessingTimeTimers(N namespace) {
+               int count = 0;
+               for (Timer<K, N> timer : processingTimeTimers) {
+                       if (timer.getNamespace().equals(namespace)) {
+                               count++;
+                       }
+               }
+
+               return count;
+       }
+
+       public int numEventTimeTimers(N namespace) {
+               int count = 0;
+               for (Timer<K, N> timer : watermarkTimers) {
+                       if (timer.getNamespace().equals(namespace)) {
+                               count++;
+                       }
+               }
+
+               return count;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 2903758..4d24b82 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -52,7 +52,7 @@ public class TestProcessingTimeServiceTest {
 
                testHarness.invoke();
 
-               
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
0);
+               assertEquals(Long.MIN_VALUE, 
testHarness.getProcessingTimeService().getCurrentProcessingTime());
 
                tp.setCurrentTime(11);
                
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
11);

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
new file mode 100644
index 0000000..16e353b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+       /**
+        * Verify that state of separate windows does not leak into other 
windows.
+        */
+       @Test
+       public void testWindowSeparationAndFiring() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new 
TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               // shouldn't have any timers
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               // right now, CountTrigger will clear it's state in onElement 
when firing
+               // ideally, this should be moved to onFire()
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               // now all state should be gone
+               assertEquals(0, testHarness.numStateEntries());
+       }
+
+       /**
+        * Verify that clear() does not leak across windows.
+        */
+       @Test
+       public void testClear() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new 
TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               // shouldn't have any timers
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+               testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+               testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+       }
+
+       @Test
+       public void testMergingWindows() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new 
TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+               // shouldn't have any timers
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+
+               assertEquals(3, testHarness.numStateEntries());
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 
6)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
4)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 
6)));
+
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 4)));
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
4)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 
6)));
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+               assertEquals(0, testHarness.numStateEntries());
+       }
+
+       @Test
+       public void testMergeSubsumingWindow() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new 
TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6)));
+
+               // shouldn't have any timers
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 
6)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 8), 
Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(4, 6)));
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+               assertEquals(0, testHarness.numStateEntries(new TimeWindow(4, 
6)));
+               assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
8)));
+
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 8)));
+
+               assertEquals(0, testHarness.numStateEntries());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
new file mode 100644
index 0000000..a46572b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsTest extends TestLogger {
+
+       @Test
+       public void testWindowAssignment() {
+               final int SESSION_GAP = 5000;
+
+               WindowAssigner.WindowAssignerContext mockContext =
+                               
mock(WindowAssigner.WindowAssignerContext.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP));
+
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+       }
+
+       @Test
+       public void testMergeSinglePointWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), 
callback);
+
+               verify(callback, never()).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeSingleWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), 
callback);
+
+               verify(callback, never()).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeConsecutiveWindows() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(
+                               Lists.newArrayList(
+                                               new TimeWindow(0, 1),
+                                               new TimeWindow(1, 2),
+                                               new TimeWindow(2, 3),
+                                               new TimeWindow(4, 5),
+                                               new TimeWindow(5, 6)),
+                               callback);
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new 
TimeWindow(2, 3))),
+                               eq(new TimeWindow(0, 3)));
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+                               eq(new TimeWindow(4, 6)));
+
+               verify(callback, times(2)).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeCoveringWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(
+                               Lists.newArrayList(
+                                               new TimeWindow(1, 1),
+                                               new TimeWindow(0, 2),
+                                               new TimeWindow(4, 7),
+                                               new TimeWindow(5, 6)),
+                               callback);
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+                               eq(new TimeWindow(0, 2)));
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+                               eq(new TimeWindow(4, 7)));
+
+               verify(callback, times(2)).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testTimeUnits() {
+               // sanity check with one other time unit
+
+               final int SESSION_GAP = 5000;
+
+               WindowAssigner.WindowAssignerContext mockContext =
+                               
mock(WindowAssigner.WindowAssignerContext.class);
+
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000));
+
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(timeWindow(0, 0 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP)));
+               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP)));
+       }
+
+       @Test
+       public void testInvalidParameters() {
+               try {
+                       EventTimeSessionWindows.withGap(Time.seconds(-1));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("0 < size"));
+               }
+
+               try {
+                       EventTimeSessionWindows.withGap(Time.seconds(0));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("0 < size"));
+               }
+
+       }
+
+       @Test
+       public void testProperties() {
+               EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.seconds(5));
+
+               assertTrue(assigner.isEventTime());
+               assertEquals(new TimeWindow.Serializer(), 
assigner.getWindowSerializer(new ExecutionConfig()));
+               
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), 
instanceOf(EventTimeTrigger.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
new file mode 100644
index 0000000..2d93ac0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+       /**
+        * Verify that state of separate windows does not leak into other 
windows.
+        */
+       @Test
+       public void testWindowSeparationAndFiring() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+               // inject several elements
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+
+       /**
+        * Verify that late elements trigger immediately and also that we don't 
set a timer
+        * for those.
+        */
+       @Test
+       public void testLateElementTriggersImmediately() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+               testHarness.advanceWatermark(2);
+
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+
+
+       /**
+        * Verify that clear() does not leak across windows.
+        */
+       @Test
+       public void testClear() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+
+       @Test
+       public void testMergingWindows() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+               assertTrue(EventTimeTrigger.create().canMerge());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(4, new TimeWindow(0, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
new file mode 100644
index 0000000..37fad7e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link GlobalWindows}
+ */
+public class GlobalWindowsTest extends TestLogger {
+
+       @Test
+       public void testWindowAssignment() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                               
mock(WindowAssigner.WindowAssignerContext.class);
+
+               GlobalWindows assigner = GlobalWindows.create();
+
+               assertThat(assigner.assignWindows("String", 0L, mockContext), 
contains(GlobalWindow.get()));
+               assertThat(assigner.assignWindows("String", 4999L, 
mockContext), contains(GlobalWindow.get()));
+               assertThat(assigner.assignWindows("String", 5000L, 
mockContext), contains(GlobalWindow.get()));
+       }
+
+       @Test
+       public void testProperties() {
+               GlobalWindows assigner = GlobalWindows.create();
+
+               assertFalse(assigner.isEventTime());
+               assertEquals(new GlobalWindow.Serializer(), 
assigner.getWindowSerializer(new ExecutionConfig()));
+               
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), 
instanceOf(GlobalWindows.NeverTrigger.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 46169a8..aa9cb91 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -290,6 +290,31 @@ public class MergingWindowSetTest {
        }
 
        /**
+        * Test adding a new window that is identical to an existing window. 
This should not cause
+        * a merge.
+        */
+       @Test
+       public void testAddingIdenticalWindows() throws Exception {
+               @SuppressWarnings("unchecked")
+               ListState<Tuple2<TimeWindow, TimeWindow>> mockState = 
mock(ListState.class);
+
+               MergingWindowSet<TimeWindow> windowSet = new 
MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), 
mockState);
+
+               TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new 
TimeWindow(1, 2), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new 
TimeWindow(1, 2)));
+
+               mergeFunction.reset();
+               assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new 
TimeWindow(1, 2), mergeFunction));
+               assertFalse(mergeFunction.hasMerged());
+               assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new 
TimeWindow(1, 2)));
+       }
+
+
+       /**
         * Test merging of a large new window that covers multiple existing 
windows.
         */
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
new file mode 100644
index 0000000..461b5fc
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link ProcessingTimeSessionWindows}
+ */
+public class ProcessingTimeSessionWindowsTest extends TestLogger {
+
+       @Test
+       public void testWindowAssignment() {
+               WindowAssigner.WindowAssignerContext mockContext =
+                               
mock(WindowAssigner.WindowAssignerContext.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(0, 5000)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(4999, 9999)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(5000, 10000)));
+       }
+
+       @Test
+       public void testMergeSinglePointWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), 
callback);
+
+               verify(callback, never()).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeSingleWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), 
callback);
+
+               verify(callback, never()).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeConsecutiveWindows() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(
+                               Lists.newArrayList(
+                                               new TimeWindow(0, 1),
+                                               new TimeWindow(1, 2),
+                                               new TimeWindow(2, 3),
+                                               new TimeWindow(4, 5),
+                                               new TimeWindow(5, 6)),
+                               callback);
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new 
TimeWindow(2, 3))),
+                               eq(new TimeWindow(0, 3)));
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+                               eq(new TimeWindow(4, 6)));
+
+               verify(callback, times(2)).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testMergeCoveringWindow() {
+               MergingWindowAssigner.MergeCallback callback = 
mock(MergingWindowAssigner.MergeCallback.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000));
+
+               assigner.mergeWindows(
+                               Lists.newArrayList(
+                                               new TimeWindow(1, 1),
+                                               new TimeWindow(0, 2),
+                                               new TimeWindow(4, 7),
+                                               new TimeWindow(5, 6)),
+                               callback);
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+                               eq(new TimeWindow(0, 2)));
+
+               verify(callback, times(1)).merge(
+                               (Collection<TimeWindow>) 
argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+                               eq(new TimeWindow(4, 7)));
+
+               verify(callback, times(2)).merge(anyCollection(), 
Matchers.anyObject());
+       }
+
+       @Test
+       public void testTimeUnits() {
+               // sanity check with one other time unit
+
+               WindowAssigner.WindowAssignerContext mockContext =
+                               
mock(WindowAssigner.WindowAssignerContext.class);
+
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.seconds(5));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(0, 5000)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(4999, 9999)));
+
+               when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+               assertThat(assigner.assignWindows("String", Long.MIN_VALUE, 
mockContext), contains(timeWindow(5000, 10000)));
+       }
+
+       @Test
+       public void testInvalidParameters() {
+               try {
+                       ProcessingTimeSessionWindows.withGap(Time.seconds(-1));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("0 < size"));
+               }
+
+               try {
+                       ProcessingTimeSessionWindows.withGap(Time.seconds(0));
+                       fail("should fail");
+               } catch (IllegalArgumentException e) {
+                       assertThat(e.toString(), containsString("0 < size"));
+               }
+
+       }
+
+       @Test
+       public void testProperties() {
+               ProcessingTimeSessionWindows assigner = 
ProcessingTimeSessionWindows.withGap(Time.seconds(5));
+
+               assertFalse(assigner.isEventTime());
+               assertEquals(new TimeWindow.Serializer(), 
assigner.getWindowSerializer(new ExecutionConfig()));
+               
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), 
instanceOf(ProcessingTimeTrigger.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
new file mode 100644
index 0000000..a0c2347
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ProcessingTimeTrigger}.
+ */
+public class ProcessingTimeTriggerTest {
+
+       /**
+        * Verify that state of separate windows does not leak into other 
windows.
+        */
+       @Test
+       public void testWindowSeparationAndFiring() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ProcessingTimeTrigger.create(), new 
TimeWindow.Serializer());
+
+               // inject several elements
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceProcessingTime(2, new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceProcessingTime(4, new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+
+       /**
+        * Verify that clear() does not leak across windows.
+        */
+       @Test
+       public void testClear() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ProcessingTimeTrigger.create(), new 
TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+       }
+
+       @Test
+       public void testMergingWindows() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ProcessingTimeTrigger.create(), new 
TimeWindow.Serializer());
+
+               assertTrue(ProcessingTimeTrigger.create().canMerge());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 4)));
+
+               assertEquals(TriggerResult.FIRE, 
testHarness.advanceProcessingTime(4, new TimeWindow(0, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
new file mode 100644
index 0000000..4302d4d
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyOnMergeContext;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTimeWindow;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+       /**
+        * Check if {@link PurgingTrigger} implements all methods of {@link 
Trigger}, as a sanity
+        * check.
+        */
+       @Test
+       public void testAllMethodsImplemented() throws NoSuchMethodException {
+               for (Method triggerMethod : Trigger.class.getDeclaredMethods()) 
{
+
+                       // try retrieving the method, this will throw an 
exception if we can't find it
+                       
PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes());
+               }
+       }
+
+       @Test
+       public void testForwarding() throws Exception {
+               Trigger<Object, TimeWindow> mockTrigger = mock(Trigger.class);
+
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(PurgingTrigger.of(mockTrigger), new 
TimeWindow.Serializer());
+
+               when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               assertEquals(TriggerResult.PURGE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               doAnswer(new Answer<TriggerResult>() {
+                       @Override
+                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
+                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
+
+                               // register some timers that we can step 
through to call onEventTime several
+                               // times in a row
+                               context.registerEventTimeTimer(1);
+                               context.registerEventTimeTimer(2);
+                               context.registerEventTimeTimer(3);
+                               context.registerEventTimeTimer(4);
+                               return TriggerResult.CONTINUE;
+                       }
+               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+
+               // set up our timers
+               testHarness.processElement(new StreamRecord<Object>(1), new 
TimeWindow(0, 2));
+
+               assertEquals(4, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.advanceWatermark(1, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.advanceWatermark(3, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               assertEquals(TriggerResult.PURGE, 
testHarness.advanceWatermark(4, new TimeWindow(0, 2)));
+
+               doAnswer(new Answer<TriggerResult>() {
+                       @Override
+                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
+                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
+
+                               // register some timers that we can step 
through to call onEventTime several
+                               // times in a row
+                               context.registerProcessingTimeTimer(1);
+                               context.registerProcessingTimeTimer(2);
+                               context.registerProcessingTimeTimer(3);
+                               context.registerProcessingTimeTimer(4);
+                               return TriggerResult.CONTINUE;
+                       }
+               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+
+               // set up our timers
+               testHarness.processElement(new StreamRecord<Object>(1), new 
TimeWindow(0, 2));
+
+               assertEquals(4, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.advanceProcessingTime(1, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.advanceProcessingTime(2, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               assertEquals(TriggerResult.FIRE_AND_PURGE, 
testHarness.advanceProcessingTime(3, new TimeWindow(0, 2)));
+
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               assertEquals(TriggerResult.PURGE, 
testHarness.advanceProcessingTime(4, new TimeWindow(0, 2)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 2), 
Collections.singletonList(new TimeWindow(0, 1)));
+               verify(mockTrigger, times(1)).onMerge(anyTimeWindow(), 
anyOnMergeContext());
+
+               testHarness.clearTriggerState(new TimeWindow(0, 2));
+               verify(mockTrigger, times(1)).clear(eq(new TimeWindow(0, 2)), 
anyTriggerContext());
+       }
+
+}

Reply via email to