Repository: flink
Updated Branches:
  refs/heads/master e71196972 -> f760b616a


Replace Trigger.onTime by Trigger.onProcessingTime/onEventTime

This also renames WatermarkTrigger to EventTimeTrigger and
ContinuousWatermarkTrigger to ContinuousEventTimeTrigger.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f760b616
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f760b616
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f760b616

Branch: refs/heads/master
Commit: f760b616af0e1608cb4c190aeb264da72f624f4c
Parents: 4442269
Author: Aljoscha Krettek <[email protected]>
Authored: Sat Oct 17 13:35:24 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Oct 20 18:39:12 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 14 +--
 .../api/windowing/assigners/GlobalWindows.java  |  7 +-
 .../windowing/assigners/SlidingTimeWindows.java |  4 +-
 .../assigners/TumblingTimeWindows.java          |  4 +-
 .../triggers/ContinuousEventTimeTrigger.java    | 90 ++++++++++++++++++++
 .../ContinuousProcessingTimeTrigger.java        |  8 +-
 .../triggers/ContinuousWatermarkTrigger.java    | 84 ------------------
 .../api/windowing/triggers/CountTrigger.java    | 10 ++-
 .../api/windowing/triggers/DeltaTrigger.java    | 10 ++-
 .../windowing/triggers/EventTimeTrigger.java    | 62 ++++++++++++++
 .../triggers/ProcessingTimeTrigger.java         |  8 +-
 .../api/windowing/triggers/PurgingTrigger.java  | 17 +++-
 .../api/windowing/triggers/Trigger.java         | 25 ++++--
 .../windowing/triggers/WatermarkTrigger.java    | 56 ------------
 .../windowing/NonKeyedWindowOperator.java       |  6 +-
 .../operators/windowing/WindowOperator.java     |  6 +-
 .../windowing/AllWindowTranslationTest.java     |  8 +-
 .../windowing/EvictingWindowOperatorTest.java   |  1 -
 .../windowing/NonKeyedWindowOperatorTest.java   | 11 ++-
 .../operators/windowing/WindowOperatorTest.java | 10 +--
 .../windowing/WindowTranslationTest.java        |  8 +-
 .../examples/windowing/SessionWindowing.java    | 10 ++-
 22 files changed, 263 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 9257ae1..9fce0d7 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1008,7 +1008,7 @@ dataStream.union(otherStream1, otherStream2, ...)
     {% highlight scala %}
 dataStream.join(otherStream)
     .where(0).equalTo(1)
-    .onTimeWindow(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
     .apply { ... }
     {% endhighlight %}
           </td>
@@ -2308,7 +2308,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create());
         The elements on the triggered window are henceforth discarded.
       </p>
 {% highlight java %}
-windowedStream.trigger(WatermarkTrigger.create());
+windowedStream.trigger(EventTimeTrigger.create());
 {% endhighlight %}
     </td>
   </tr>
@@ -2334,7 +2334,7 @@ 
windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE
         The elements on the triggered window are retained.
       </p>
 {% highlight java %}
-windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
+windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
 {% endhighlight %}
     </td>
   </tr>
@@ -2414,7 +2414,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create);
         The elements on the triggered window are henceforth discarded.
       </p>
 {% highlight scala %}
-windowedStream.trigger(WatermarkTrigger.create);
+windowedStream.trigger(EventTimeTrigger.create);
 {% endhighlight %}
     </td>
   </tr>
@@ -2440,7 +2440,7 @@ 
windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE
         The elements on the triggered window are retained.
       </p>
 {% highlight scala %}
-windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
+windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
 {% endhighlight %}
     </td>
   </tr>
@@ -2653,7 +2653,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS))
         <td>
     {% highlight java %}
 stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS)))
-  .trigger(WatermarkTrigger.create())
+  .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
       </tr>
@@ -2667,7 +2667,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS), 
Time.of(1, TimeUnit.SECONDS))
         <td>
     {% highlight java %}
 stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
-  .trigger(WatermarkTrigger.create())
+  .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
       </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 9b7c8f2..4d5b9d7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -79,7 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
                }
 
                @Override
-               public TriggerResult onTime(long time, TriggerContext ctx) {
+               public TriggerResult onEventTime(long time, TriggerContext ctx) 
{
+                       return TriggerResult.CONTINUE;
+               }
+
+               @Override
+               public TriggerResult onProcessingTime(long time, TriggerContext 
ctx) {
                        return TriggerResult.CONTINUE;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 7b1f1f4..5f7ab45 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import java.util.ArrayList;
@@ -81,7 +81,7 @@ public class SlidingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
                if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime) {
                        return ProcessingTimeTrigger.create();
                } else {
-                       return WatermarkTrigger.create();
+                       return EventTimeTrigger.create();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index aa019e4..463b2c4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import java.util.Collection;
@@ -67,7 +67,7 @@ public class TumblingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
                if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime) {
                        return ProcessingTimeTrigger.create();
                } else {
-                       return WatermarkTrigger.create();
+                       return EventTimeTrigger.create();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
new file mode 100644
index 0000000..ea26309
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. 
This fires based
+ * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+public class ContinuousEventTimeTrigger<W extends Window> implements 
Trigger<Object, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final long interval;
+
+       private ContinuousEventTimeTrigger(long interval) {
+               this.interval = interval;
+       }
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) throws Exception {
+
+               OperatorState<Boolean> first = ctx.getKeyValueState("first", 
true);
+
+               if (first.value()) {
+                       long start = timestamp - (timestamp % interval);
+                       long nextFireTimestamp = start + interval;
+
+                       ctx.registerEventTimeTimer(nextFireTimestamp);
+
+                       first.update(false);
+                       return TriggerResult.CONTINUE;
+               }
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+               ctx.registerEventTimeTimer(time + interval);
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public String toString() {
+               return "ContinuousProcessingTimeTrigger(" + interval + ")";
+       }
+
+       @VisibleForTesting
+       public long getInterval() {
+               return interval;
+       }
+
+       /**
+        * Creates a trigger that continuously fires based on the given 
interval.
+        *
+        * @param interval The time interval at which to fire.
+        * @param <W> The type of {@link Window Windows} on which this trigger 
can operate.
+        */
+       public static <W extends Window> ContinuousEventTimeTrigger<W> 
of(AbstractTime interval) {
+               return new 
ContinuousEventTimeTrigger<>(interval.toMilliseconds());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 3ea60f4..be56738 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -63,7 +63,13 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
        }
 
        @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) throws 
Exception {
+       public TriggerResult onEventTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time, TriggerContext ctx) 
throws Exception {
 
                OperatorState<Long> fireState = 
ctx.getKeyValueState("fire-timestamp", 0L);
                long nextFireTimestamp = fireState.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
deleted file mode 100644
index 494ba3a..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. 
This fires based
- * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
- */
-public class ContinuousWatermarkTrigger<W extends Window> implements 
Trigger<Object, W> {
-       private static final long serialVersionUID = 1L;
-
-       private final long interval;
-
-       private ContinuousWatermarkTrigger(long interval) {
-               this.interval = interval;
-       }
-
-       @Override
-       public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) throws Exception {
-
-               OperatorState<Boolean> first = ctx.getKeyValueState("first", 
true);
-
-               if (first.value()) {
-                       long start = timestamp - (timestamp % interval);
-                       long nextFireTimestamp = start + interval;
-
-                       ctx.registerWatermarkTimer(nextFireTimestamp);
-
-                       first.update(false);
-                       return TriggerResult.CONTINUE;
-               }
-               return TriggerResult.CONTINUE;
-       }
-
-       @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) {
-               ctx.registerWatermarkTimer(time + interval);
-               return TriggerResult.FIRE;
-       }
-
-       @Override
-       public String toString() {
-               return "ContinuousProcessingTimeTrigger(" + interval + ")";
-       }
-
-       @VisibleForTesting
-       public long getInterval() {
-               return interval;
-       }
-
-       /**
-        * Creates a trigger that continuously fires based on the given 
interval.
-        *
-        * @param interval The time interval at which to fire.
-        * @param <W> The type of {@link Window Windows} on which this trigger 
can operate.
-        */
-       public static <W extends Window> ContinuousWatermarkTrigger<W> 
of(AbstractTime interval) {
-               return new 
ContinuousWatermarkTrigger<>(interval.toMilliseconds());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 57582f7..8512989 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -49,8 +49,14 @@ public class CountTrigger<W extends Window> implements 
Trigger<Object, W> {
        }
 
        @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) {
-               return null;
+       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index b1283f5..1c6523d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -59,8 +59,14 @@ public class DeltaTrigger<T extends Serializable, W extends 
Window> implements T
        }
 
        @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) {
-               return null;
+       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
new file mode 100644
index 0000000..4b6613c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+/**
+ * A {@link Trigger} that fires once the watermark passes the end of the window
+ * to which a pane belongs.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ */
+public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
+       private static final long serialVersionUID = 1L;
+
+       private EventTimeTrigger() {}
+
+       @Override
+       public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) throws Exception {
+               ctx.registerEventTimeTimer(window.maxTimestamp());
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+               return TriggerResult.FIRE_AND_PURGE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public String toString() {
+               return "EventTimeTrigger()";
+       }
+
+       /**
+        * Creates trigger that fires once the watermark passes the end of the 
window.
+        */
+       public static EventTimeTrigger create() {
+               return new EventTimeTrigger();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 70c57ef..6278ba6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -35,7 +35,13 @@ public class ProcessingTimeTrigger implements 
Trigger<Object, TimeWindow> {
        }
 
        @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) {
+       public TriggerResult onEventTime(long time,
+                       TriggerContext ctx) throws Exception {
+               return TriggerResult.CONTINUE;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
                return TriggerResult.FIRE_AND_PURGE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 76e36b1..eaca336 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -53,8 +53,21 @@ public class PurgingTrigger<T, W extends Window> implements 
Trigger<T, W> {
        }
 
        @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) throws 
Exception {
-               TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
+       public TriggerResult onEventTime(long time, TriggerContext ctx) throws 
Exception {
+               TriggerResult triggerResult = nestedTrigger.onEventTime(time, 
ctx);
+               switch (triggerResult) {
+                       case FIRE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       case FIRE_AND_PURGE:
+                               return TriggerResult.FIRE_AND_PURGE;
+                       default:
+                               return TriggerResult.CONTINUE;
+               }
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long time, TriggerContext ctx) 
throws Exception {
+               TriggerResult triggerResult = 
nestedTrigger.onProcessingTime(time, ctx);
                switch (triggerResult) {
                        case FIRE:
                                return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 56b8687..ef8110b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -55,12 +55,21 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
        TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception;
 
        /**
-        * Called when a timer that was set using the trigger context fires.
+        * Called when a processing-time timer that was set using the trigger 
context fires.
         *
         * @param time The timestamp at which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onTime(long time, TriggerContext ctx) throws Exception;
+       TriggerResult onProcessingTime(long time, TriggerContext ctx) throws 
Exception;
+
+       /**
+        * Called when an event-time timer that was set using the trigger 
context fires.
+        *
+        * @param time The timestamp at which the timer fired.
+        * @param ctx A context object that can be used to register timer 
callbacks.
+        */
+       TriggerResult onEventTime(long time, TriggerContext ctx) throws 
Exception;
+
 
        /**
         * Result type for trigger methods. This determines what happens which 
the window.
@@ -82,21 +91,21 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
 
                /**
                 * Register a system time callback. When the current system 
time passes the specified
-                * time {@link #onTime(long, TriggerContext)} is called.
+                * time {@link #onProcessingTime(long, TriggerContext)} is 
called with the time specified here.
                 *
-                * @param time The time at which to invoke {@link #onTime(long, 
TriggerContext)}
+                * @param time The time at which to invoke {@link 
#onProcessingTime(long, TriggerContext)}
                 */
                void registerProcessingTimeTimer(long time);
 
                /**
-                * Register a watermark callback. When the current watermark 
passes the specified
-                * time {@link #onTime(long, TriggerContext)} is called.
+                * Register an event-time callback. When the current watermark 
passes the specified
+                * time {@link #onEventTime(long, TriggerContext)} is called 
with the time specified here.
                 *
                 * @see org.apache.flink.streaming.api.watermark.Watermark
                 *
-                * @param time The watermark at which to invoke {@link 
#onTime(long, TriggerContext)}
+                * @param time The watermark at which to invoke {@link 
#onEventTime(long, TriggerContext)}
                 */
-               void registerWatermarkTimer(long time);
+               void registerEventTimeTimer(long time);
 
                /**
                 * Retrieves an {@link OperatorState} object that can be used 
to interact with

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
deleted file mode 100644
index d17066b..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the watermark passes the end of the window
- * to which a pane belongs.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
-public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
-       private static final long serialVersionUID = 1L;
-
-       private WatermarkTrigger() {}
-
-       @Override
-       public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) throws Exception {
-               ctx.registerWatermarkTimer(window.maxTimestamp());
-               return TriggerResult.CONTINUE;
-       }
-
-       @Override
-       public TriggerResult onTime(long time, TriggerContext ctx) {
-               return TriggerResult.FIRE_AND_PURGE;
-       }
-
-       @Override
-       public String toString() {
-               return "WatermarkTrigger()";
-       }
-
-       /**
-        * Creates trigger that fires once the watermark passes the end of the 
window.
-        */
-       public static WatermarkTrigger create() {
-               return new WatermarkTrigger();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7ab33cf..5de6cd1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -420,7 +420,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                }
 
                @Override
-               public void registerWatermarkTimer(long time) {
+               public void registerEventTimeTimer(long time) {
                        if (watermarkTimer == time) {
                                // we already have set a trigger for that time
                                return;
@@ -436,7 +436,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
-                               return trigger.onTime(time, this);
+                               return trigger.onProcessingTime(time, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }
@@ -444,7 +444,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onTime(time, this);
+                               return trigger.onEventTime(time, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0b3274f..2491c57 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -489,7 +489,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                }
 
                @Override
-               public void registerWatermarkTimer(long time) {
+               public void registerEventTimeTimer(long time) {
                        if (watermarkTimer == time) {
                                // we already have set a trigger for that time
                                return;
@@ -505,7 +505,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
-                               return trigger.onTime(time, this);
+                               return trigger.onProcessingTime(time, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }
@@ -513,7 +513,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onTime(time, this);
+                               return trigger.onEventTime(time, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 45ef29f..282c71f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator1 instanceof 
EvictingNonKeyedWindowOperator);
                EvictingNonKeyedWindowOperator winOperator1 = 
(EvictingNonKeyedWindowOperator) operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index afc65d5..1821308 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import jdk.nashorn.internal.objects.Global;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index a91d957..02e032a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -19,7 +19,6 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
@@ -29,10 +28,10 @@ import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
@@ -79,7 +78,7 @@ public class NonKeyedWindowOperatorTest {
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               WatermarkTrigger.create());
+                               EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -160,7 +159,7 @@ public class NonKeyedWindowOperatorTest {
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               WatermarkTrigger.create());
+                               EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -239,7 +238,7 @@ public class NonKeyedWindowOperatorTest {
                                new GlobalWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
+                               
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index e825b88..b94e530 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -33,10 +33,10 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindow
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -82,7 +82,7 @@ public class WindowOperatorTest {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               WatermarkTrigger.create());
+                               EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -171,7 +171,7 @@ public class WindowOperatorTest {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               WatermarkTrigger.create());
+                               EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -256,7 +256,7 @@ public class WindowOperatorTest {
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
+                               
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 02ec820..13766a1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator1 instanceof WindowOperator);
                WindowOperator winOperator1 = (WindowOperator) operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator2 instanceof WindowOperator);
                WindowOperator winOperator2 = (WindowOperator) operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
                EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 60b7894..3c63156 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -116,7 +116,7 @@ public class SessionWindowing {
                        // Update the last seen event time
                        lastSeenState.update(timestamp);
 
-                       ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
+                       ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
 
                        if (timeSinceLastEvent > sessionTimeout) {
                                return TriggerResult.FIRE_AND_PURGE;
@@ -126,7 +126,7 @@ public class SessionWindowing {
                }
 
                @Override
-               public TriggerResult onTime(long time, TriggerContext ctx) 
throws Exception {
+               public TriggerResult onEventTime(long time, TriggerContext ctx) 
throws Exception {
                        OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
                        Long lastSeen = lastSeenState.value();
 
@@ -135,6 +135,12 @@ public class SessionWindowing {
                        }
                        return TriggerResult.CONTINUE;
                }
+
+               @Override
+               public TriggerResult onProcessingTime(long time,
+                               TriggerContext ctx) throws Exception {
+                       return TriggerResult.CONTINUE;
+               }
        }
 
        // 
*************************************************************************

Reply via email to