Repository: flink
Updated Branches:
  refs/heads/master c36977f76 -> e69693778


[hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime

Before these trigger methods had no information about the window that
they are responsible for. This information might be required for
implementing more advanced trigger behaviour.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e18cdd04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e18cdd04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e18cdd04

Branch: refs/heads/master
Commit: e18cdd0498003417c2fe6b0f446f6a943fbc98e3
Parents: c36977f
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 23 11:28:58 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 23 15:48:02 2015 +0200

----------------------------------------------------------------------
 .../streaming/examples/windowing/SessionWindowing.java  |  5 ++---
 .../api/windowing/assigners/GlobalWindows.java          |  4 ++--
 .../windowing/triggers/ContinuousEventTimeTrigger.java  |  5 ++---
 .../triggers/ContinuousProcessingTimeTrigger.java       |  5 ++---
 .../streaming/api/windowing/triggers/CountTrigger.java  |  5 ++---
 .../streaming/api/windowing/triggers/DeltaTrigger.java  |  5 ++---
 .../api/windowing/triggers/EventTimeTrigger.java        | 12 +++++++-----
 .../api/windowing/triggers/ProcessingTimeTrigger.java   |  5 ++---
 .../api/windowing/triggers/PurgingTrigger.java          |  8 ++++----
 .../flink/streaming/api/windowing/triggers/Trigger.java | 12 ++++++------
 .../operators/windowing/NonKeyedWindowOperator.java     |  4 ++--
 .../runtime/operators/windowing/WindowOperator.java     |  4 ++--
 .../apache/flink/streaming/util/TestHarnessUtil.java    |  1 -
 13 files changed, 35 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 3c63156..035727a 100644
--- 
a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -126,7 +126,7 @@ public class SessionWindowing {
                }
 
                @Override
-               public TriggerResult onEventTime(long time, TriggerContext ctx) 
throws Exception {
+               public TriggerResult onEventTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
                        OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
                        Long lastSeen = lastSeenState.value();
 
@@ -137,8 +137,7 @@ public class SessionWindowing {
                }
 
                @Override
-               public TriggerResult onProcessingTime(long time,
-                               TriggerContext ctx) throws Exception {
+               public TriggerResult onProcessingTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 4d5b9d7..99a4962 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -79,12 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
                }
 
                @Override
-               public TriggerResult onEventTime(long time, TriggerContext ctx) 
{
+               public TriggerResult onEventTime(long time, GlobalWindow 
window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                }
 
                @Override
-               public TriggerResult onProcessingTime(long time, TriggerContext 
ctx) {
+               public TriggerResult onProcessingTime(long time, GlobalWindow 
window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index ea26309..4b6af8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -57,14 +57,13 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
        }
 
        @Override
-       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+       public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) {
                ctx.registerEventTimeTimer(time + interval);
                return TriggerResult.FIRE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index be56738..66f9bda 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -63,13 +63,12 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
        }
 
        @Override
-       public TriggerResult onEventTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time, TriggerContext ctx) 
throws Exception {
+       public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
 
                OperatorState<Long> fireState = 
ctx.getKeyValueState("fire-timestamp", 0L);
                long nextFireTimestamp = fireState.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 8512989..efb62d7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -49,13 +49,12 @@ public class CountTrigger<W extends Window> implements 
Trigger<Object, W> {
        }
 
        @Override
-       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+       public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) {
                return TriggerResult.CONTINUE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 1c6523d..d791d28 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -59,13 +59,12 @@ public class DeltaTrigger<T extends Serializable, W extends 
Window> implements T
        }
 
        @Override
-       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+       public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) {
                return TriggerResult.CONTINUE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 4b6613c..831e360 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -37,13 +37,12 @@ public class EventTimeTrigger implements Trigger<Object, 
TimeWindow> {
        }
 
        @Override
-       public TriggerResult onEventTime(long time, TriggerContext ctx) {
+       public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
                return TriggerResult.FIRE_AND_PURGE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 
@@ -53,10 +52,13 @@ public class EventTimeTrigger implements Trigger<Object, 
TimeWindow> {
        }
 
        /**
-        * Creates trigger that fires once the watermark passes the end of the 
window.
+        * Creates an event-time trigger that fires once the watermark passes 
the end of the window.
+        *
+        * <p>
+        * Once the trigger fires all elements are discarded. Elements that 
arrive late immediately
+        * trigger window evaluation with just this one element.
         */
        public static EventTimeTrigger create() {
                return new EventTimeTrigger();
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 6278ba6..b460c8a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -35,13 +35,12 @@ public class ProcessingTimeTrigger implements 
Trigger<Object, TimeWindow> {
        }
 
        @Override
-       public TriggerResult onEventTime(long time,
-                       TriggerContext ctx) throws Exception {
+       public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
                return TriggerResult.CONTINUE;
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
+       public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) {
                return TriggerResult.FIRE_AND_PURGE;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index eaca336..cc20296 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -53,8 +53,8 @@ public class PurgingTrigger<T, W extends Window> implements 
Trigger<T, W> {
        }
 
        @Override
-       public TriggerResult onEventTime(long time, TriggerContext ctx) throws 
Exception {
-               TriggerResult triggerResult = nestedTrigger.onEventTime(time, 
ctx);
+       public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) throws Exception {
+               TriggerResult triggerResult = nestedTrigger.onEventTime(time, 
window, ctx);
                switch (triggerResult) {
                        case FIRE:
                                return TriggerResult.FIRE_AND_PURGE;
@@ -66,8 +66,8 @@ public class PurgingTrigger<T, W extends Window> implements 
Trigger<T, W> {
        }
 
        @Override
-       public TriggerResult onProcessingTime(long time, TriggerContext ctx) 
throws Exception {
-               TriggerResult triggerResult = 
nestedTrigger.onProcessingTime(time, ctx);
+       public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
nestedTrigger.onProcessingTime(time, window, ctx);
                switch (triggerResult) {
                        case FIRE:
                                return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index ef8110b..15ccb33 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -60,7 +60,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * @param time The timestamp at which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onProcessingTime(long time, TriggerContext ctx) throws 
Exception;
+       TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) 
throws Exception;
 
        /**
         * Called when an event-time timer that was set using the trigger 
context fires.
@@ -68,7 +68,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * @param time The timestamp at which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onEventTime(long time, TriggerContext ctx) throws 
Exception;
+       TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
throws Exception;
 
 
        /**
@@ -91,19 +91,19 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
 
                /**
                 * Register a system time callback. When the current system 
time passes the specified
-                * time {@link #onProcessingTime(long, TriggerContext)} is 
called with the time specified here.
+                * time {@link #onProcessingTime(long, Window, TriggerContext)} 
is called with the time specified here.
                 *
-                * @param time The time at which to invoke {@link 
#onProcessingTime(long, TriggerContext)}
+                * @param time The time at which to invoke {@link 
#onProcessingTime(long, Window, TriggerContext)}
                 */
                void registerProcessingTimeTimer(long time);
 
                /**
                 * Register an event-time callback. When the current watermark 
passes the specified
-                * time {@link #onEventTime(long, TriggerContext)} is called 
with the time specified here.
+                * time {@link #onEventTime(long, Window, TriggerContext)} is 
called with the time specified here.
                 *
                 * @see org.apache.flink.streaming.api.watermark.Watermark
                 *
-                * @param time The watermark at which to invoke {@link 
#onEventTime(long, TriggerContext)}
+                * @param time The watermark at which to invoke {@link 
#onEventTime(long, Window, TriggerContext)}
                 */
                void registerEventTimeTimer(long time);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 03e8c4c..2209d5e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -437,7 +437,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
-                               return trigger.onProcessingTime(time, this);
+                               return trigger.onProcessingTime(time, window, 
this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }
@@ -445,7 +445,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onEventTime(time, this);
+                               return trigger.onEventTime(time, window, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 30ce477..e8e001d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -510,7 +510,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
-                               return trigger.onProcessingTime(time, this);
+                               return trigger.onProcessingTime(time, window, 
this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }
@@ -518,7 +518,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        if (time == watermarkTimer) {
-                               return trigger.onEventTime(time, this);
+                               return trigger.onEventTime(time, window, this);
                        } else {
                                return Trigger.TriggerResult.CONTINUE;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 0c5cd8f..889ae37 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;

Reply via email to