Repository: flink
Updated Branches:
  refs/heads/master 8df0bbacb -> fd324ea72


[FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated 
classes

This closes #1603


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50bd65a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50bd65a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50bd65a5

Branch: refs/heads/master
Commit: 50bd65a574776817a03dd32fd438cb2327447109
Parents: 8df0bba
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Feb 7 21:46:16 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 22:15:31 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |   3 +-
 .../api/windowing/assigners/GlobalWindows.java  |  10 +-
 .../triggers/ContinuousEventTimeTrigger.java    |   7 +-
 .../ContinuousProcessingTimeTrigger.java        |   2 +-
 .../api/windowing/triggers/CountTrigger.java    |   2 +-
 .../api/windowing/triggers/DeltaTrigger.java    |   7 +-
 .../windowing/triggers/EventTimeTrigger.java    |   5 +-
 .../triggers/ProcessingTimeTrigger.java         |   5 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   4 +-
 .../api/windowing/triggers/Trigger.java         | 102 +++++--------------
 .../api/windowing/triggers/TriggerResult.java   |  96 +++++++++++++++++
 .../windowing/EvictingWindowOperator.java       |   5 +-
 .../windowing/NonKeyedWindowOperator.java       |  34 ++++---
 .../operators/windowing/WindowOperator.java     |  18 ++--
 14 files changed, 179 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index bd82800..e2df160 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.ArrayList;
@@ -95,7 +96,7 @@ public class SessionWindowing {
                env.execute();
        }
 
-       private static class SessionTrigger implements Trigger<Tuple3<String, 
Long, Integer>, GlobalWindow> {
+       private static class SessionTrigger extends Trigger<Tuple3<String, 
Long, Integer>, GlobalWindow> {
 
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index d3eb2ac..a4d92cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.Collection;
@@ -67,15 +68,12 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
        /**
         * A trigger that never fires, as default Trigger for GlobalWindows.
         */
-       private static class NeverTrigger implements Trigger<Object, 
GlobalWindow> {
+       private static class NeverTrigger extends Trigger<Object, GlobalWindow> 
{
                private static final long serialVersionUID = 1L;
 
                @Override
-               public TriggerResult onElement(Object element,
-                               long timestamp,
-                               GlobalWindow window,
-                               TriggerContext ctx) {
-                               return TriggerResult.CONTINUE;
+               public TriggerResult onElement(Object element, long timestamp, 
GlobalWindow window, TriggerContext ctx) {
+                       return TriggerResult.CONTINUE;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 02a935c..09f3959 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -33,7 +34,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class ContinuousEventTimeTrigger<W extends Window> implements 
Trigger<Object, W> {
+public class ContinuousEventTimeTrigger<W extends Window> extends 
Trigger<Object, W> {
        private static final long serialVersionUID = 1L;
 
        private final long interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 25d9508..ca7ecb6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class ContinuousProcessingTimeTrigger<W extends Window> implements 
Trigger<Object, W> {
+public class ContinuousProcessingTimeTrigger<W extends Window> extends 
Trigger<Object, W> {
        private static final long serialVersionUID = 1L;
 
        private final long interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 725cbf6..5113991 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -30,7 +30,7 @@ import java.io.IOException;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class CountTrigger<W extends Window> implements Trigger<Object, W> {
+public class CountTrigger<W extends Window> extends Trigger<Object, W> {
        private static final long serialVersionUID = 1L;
 
        private final long maxCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 55c719a..4a6cde3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.api.common.state.ValueState;
@@ -33,7 +34,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
        private static final long serialVersionUID = 1L;
 
        private final DeltaFunction<T> deltaFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index bbd0a01..17d04c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -25,7 +26,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  *
  * @see org.apache.flink.streaming.api.watermark.Watermark
  */
-public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
+public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
 
        private EventTimeTrigger() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 85d6749..ae0b0e5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -23,7 +24,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  * A {@link Trigger} that fires once the current system time passes the end of 
the window
  * to which a pane belongs.
  */
-public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
+public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
 
        private ProcessingTimeTrigger() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 626906c..0ec236b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -25,12 +25,12 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * <p>
  * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
- * {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
+ * {@link TriggerResult}
  *
  * @param <T> The type of elements on which this trigger can operate.
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
+public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
        private static final long serialVersionUID = 1L;
 
        private Trigger<T, W> nestedTrigger;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index fb61064..a200e5c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.api.common.state.State;
@@ -39,12 +40,14 @@ import java.io.Serializable;
  * <p>
  * Triggers must not maintain state internally since they can be re-created or 
reused for
  * different keys. All necessary state should be persisted using the state 
abstraction
- * available on the {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
+ * available on the {@link TriggerContext}.
  *
  * @param <T> The type of elements on which this {@code Trigger} works.
  * @param <W> The type of {@link Window Windows} on which this {@code Trigger} 
can operate.
  */
-public interface Trigger<T, W extends Window> extends Serializable {
+public abstract class Trigger<T, W extends Window> implements Serializable {
+       
+       private static final long serialVersionUID = -4104633972991191369L;
 
        /**
         * Called for every element that gets added to a pane. The result of 
this will determine
@@ -55,7 +58,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * @param window The window to which this pane belongs.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception;
+       public abstract TriggerResult onElement(T element, long timestamp, W 
window, TriggerContext ctx) throws Exception;
 
        /**
         * Called when a processing-time timer that was set using the trigger 
context fires.
@@ -63,7 +66,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * @param time The timestamp at which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) 
throws Exception;
+       public abstract TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception;
 
        /**
         * Called when an event-time timer that was set using the trigger 
context fires.
@@ -71,102 +74,53 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         * @param time The timestamp at which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
         */
-       TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
throws Exception;
+       public abstract TriggerResult onEventTime(long time, W window, 
TriggerContext ctx) throws Exception;
 
        /**
         * Clears any state that the trigger might still hold for the given 
window. This is called
         * when a window is purged. Timers set using {@link 
TriggerContext#registerEventTimeTimer(long)}
         * and {@link TriggerContext#registerProcessingTimeTimer(long)} should 
be deleted here as
         * well as state acquired using {@link 
TriggerContext#getPartitionedState(StateDescriptor)}.
+        * 
+        * <p>By default, this method does nothing.
         */
-       void clear(W window, TriggerContext ctx) throws Exception;
+       public void clear(W window, TriggerContext ctx) throws Exception {}
 
+       // 
------------------------------------------------------------------------
+       
        /**
-        * Result type for trigger methods. This determines what happens with 
the window.
-        *
-        * <p>
-        * On {@code FIRE} the pane is evaluated and results are emitted. The 
contents of the window
-        * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the 
contents of the pane
-        * are purged. On {@code CONTINUE} nothing happens, processing 
continues. On {@code PURGE}
-        * the contents of the window are discarded and no result is emitted 
for the window.
-        */
-       enum TriggerResult {
-               CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, 
false), PURGE(false, true);
-
-               private final boolean fire;
-               private final boolean purge;
-
-               TriggerResult(boolean fire, boolean purge) {
-                       this.purge = purge;
-                       this.fire = fire;
-               }
-
-               public boolean isFire() {
-                       return fire;
-               }
-
-               public boolean isPurge() {
-                       return purge;
-               }
-
-               /**
-                * Merges two {@code TriggerResults}. This specifies what 
should happen if we have
-                * two results from a Trigger, for example as a result from
-                * {@link #onElement(Object, long, Window, TriggerContext)} and
-                * {@link #onEventTime(long, Window, TriggerContext)}.
-                *
-                * <p>
-                * For example, if one result says {@code CONTINUE} while the 
other says {@code FIRE}
-                * then {@code FIRE} is the combined result;
-                */
-               public static TriggerResult merge(TriggerResult a, 
TriggerResult b) {
-                       if (a.purge || b.purge) {
-                               if (a.fire || b.fire) {
-                                       return FIRE_AND_PURGE;
-                               } else {
-                                       return PURGE;
-                               }
-                       } else if (a.fire || b.fire) {
-                               return FIRE;
-                       } else {
-                               return CONTINUE;
-                       }
-               }
-       }
-
-       /**
-        * A context object that is given to {@code Trigger} methods to allow 
them to register timer
+        * A context object that is given to {@link Trigger} methods to allow 
them to register timer
         * callbacks and deal with state.
         */
-       interface TriggerContext {
-
+       public interface TriggerContext {
+       
                /**
                 * Register a system time callback. When the current system 
time passes the specified
-                * time {@link #onProcessingTime(long, Window, TriggerContext)} 
is called with the time specified here.
+                * time {@link Trigger#onProcessingTime(long, Window, 
TriggerContext)} is called with the time specified here.
                 *
-                * @param time The time at which to invoke {@link 
#onProcessingTime(long, Window, TriggerContext)}
+                * @param time The time at which to invoke {@link 
Trigger#onProcessingTime(long, Window, TriggerContext)}
                 */
                void registerProcessingTimeTimer(long time);
-
+       
                /**
                 * Register an event-time callback. When the current watermark 
passes the specified
-                * time {@link #onEventTime(long, Window, TriggerContext)} is 
called with the time specified here.
+                * time {@link Trigger#onEventTime(long, Window, 
TriggerContext)} is called with the time specified here.
                 *
-                * @param time The watermark at which to invoke {@link 
#onEventTime(long, Window, TriggerContext)}
+                * @param time The watermark at which to invoke {@link 
Trigger#onEventTime(long, Window, TriggerContext)}
                 * @see org.apache.flink.streaming.api.watermark.Watermark
                 */
                void registerEventTimeTimer(long time);
-
+       
                /**
                 * Delete the processing time trigger for the given time.
                 */
                void deleteProcessingTimeTimer(long time);
-
+       
                /**
                 * Delete the event-time trigger for the given time.
                 */
                void deleteEventTimeTimer(long time);
-
+       
                /**
                 * Retrieves an {@link State} object that can be used to 
interact with
                 * fault-tolerant state that is scoped to the window and key of 
the current
@@ -180,7 +134,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
                 *                                       function (function is 
not part os a KeyedStream).
                 */
                <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor);
-
+       
                /**
                 * Retrieves a {@link ValueState} object that can be used to 
interact with
                 * fault-tolerant state that is scoped to the window and key of 
the current
@@ -199,8 +153,8 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
                 */
                @Deprecated
                <S extends Serializable> ValueState<S> getKeyValueState(String 
name, Class<S> stateType, S defaultState);
-
-
+       
+       
                /**
                 * Retrieves a {@link ValueState} object that can be used to 
interact with
                 * fault-tolerant state that is scoped to the window and key of 
the current

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
new file mode 100644
index 0000000..2841542
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Result type for trigger methods. This determines what happens with the 
window,
+ * for example whether the window function should be called, or the window
+ * should be discarded.
+ */
+public enum TriggerResult {
+
+       /**
+        * No action is taken on the window.
+        */
+       CONTINUE(false, false),
+
+       /**
+        * {@code FIRE_AND_PURGE} evaluates the window function and emits the 
window
+        * result. 
+        */
+       FIRE_AND_PURGE(true, true),
+
+       /**
+        * On {@code FIRE}, the window is evaluated and results are emitted.
+        * The window is not purged, though, all elements are retained.
+        */
+       FIRE(true, false),
+
+       /**
+        * All elements in the window are cleared and the window is discarded,
+        * without evaluating the window function or emitting any elements.
+        */
+       PURGE(false, true);
+
+       // 
------------------------------------------------------------------------
+       
+       private final boolean fire;
+       private final boolean purge;
+
+       TriggerResult(boolean fire, boolean purge) {
+               this.purge = purge;
+               this.fire = fire;
+       }
+
+       public boolean isFire() {
+               return fire;
+       }
+
+       public boolean isPurge() {
+               return purge;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Merges two {@code TriggerResults}. This specifies what should happen 
if we have
+        * two results from a Trigger, for example as a result from
+        * {@link Trigger#onElement(Object, long, Window, 
Trigger.TriggerContext)} and
+        * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
+        *
+        * <p>
+        * For example, if one result says {@code CONTINUE} while the other 
says {@code FIRE}
+        * then {@code FIRE} is the combined result;
+        */
+       public static TriggerResult merge(TriggerResult a, TriggerResult b) {
+               if (a.purge || b.purge) {
+                       if (a.fire || b.fire) {
+                               return FIRE_AND_PURGE;
+                       } else {
+                               return PURGE;
+                       }
+               } else if (a.fire || b.fire) {
+                       return FIRE;
+               } else {
+                       return CONTINUE;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 41ec91a..a960ac4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -87,7 +88,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                        context.key = key;
                        context.window = window;
-                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
+                       TriggerResult triggerResult = 
context.onElement(element);
 
                        processTriggerResult(triggerResult, key, window);
                }
@@ -95,7 +96,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
        @Override
        @SuppressWarnings("unchecked,rawtypes")
-       protected void processTriggerResult(Trigger.TriggerResult 
triggerResult, K key, W window) throws Exception {
+       protected void processTriggerResult(TriggerResult triggerResult, K key, 
W window) throws Exception {
                if (!triggerResult.isFire() && !triggerResult.isPurge()) {
                        // do nothing
                        return;

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d7dbaf5..93761e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -38,6 +38,8 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
@@ -248,7 +250,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                windows.put(window, context);
                        }
                        context.windowBuffer.storeElement(element);
-                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
+                       TriggerResult triggerResult = 
context.onElement(element);
                        processTriggerResult(triggerResult, window);
                }
        }
@@ -264,7 +266,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                }
        }
 
-       private void processTriggerResult(Trigger.TriggerResult triggerResult, 
W window) throws Exception {
+       private void processTriggerResult(TriggerResult triggerResult, W 
window) throws Exception {
                if (!triggerResult.isFire() && !triggerResult.isPurge()) {
                        // do nothing
                        return;
@@ -311,7 +313,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                // We have to check here whether the entry in 
the set still reflects the
                                // currently set timer in the Context.
                                if (ctx.watermarkTimer <= mark.getTimestamp()) {
-                                       Trigger.TriggerResult triggerResult = 
ctx.onEventTime(ctx.watermarkTimer);
+                                       TriggerResult triggerResult = 
ctx.onEventTime(ctx.watermarkTimer);
                                        processTriggerResult(triggerResult, 
ctx.window);
                                }
                        }
@@ -343,7 +345,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                // performance reasons. We have to check here 
whether the entry in the set still
                                // reflects the currently set timer in the 
Context.
                                if (ctx.processingTimeTimer <= time) {
-                                       Trigger.TriggerResult triggerResult = 
ctx.onProcessingTime(ctx.processingTimeTimer);
+                                       TriggerResult triggerResult = 
ctx.onProcessingTime(ctx.processingTimeTimer);
                                        processTriggerResult(triggerResult, 
ctx.window);
                                }
                        }
@@ -360,7 +362,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
         * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes 
all
         * have their own instance of the {@code Trigger}.
         */
-       protected class Context implements Trigger.TriggerContext {
+       protected class Context implements TriggerContext {
                protected W window;
 
                protected WindowBuffer<IN> windowBuffer;
@@ -538,41 +540,41 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                }
 
-               public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
-                       Trigger.TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+               public TriggerResult onElement(StreamRecord<IN> element) throws 
Exception {
+                       TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
                        if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
                                // fire now and don't wait for the next 
watermark update
-                               Trigger.TriggerResult onEventTimeResult = 
onEventTime(watermarkTimer);
-                               return 
Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+                               TriggerResult onEventTimeResult = 
onEventTime(watermarkTimer);
+                               return TriggerResult.merge(onElementResult, 
onEventTimeResult);
                        } else {
                                return onElementResult;
                        }
                }
 
-               public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
+               public TriggerResult onProcessingTime(long time) throws 
Exception {
                        if (time == processingTimeTimer) {
                                processingTimeTimer = -1;
                                return trigger.onProcessingTime(time, window, 
this);
                        } else {
-                               return Trigger.TriggerResult.CONTINUE;
+                               return TriggerResult.CONTINUE;
                        }
                }
 
-               public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
+               public TriggerResult onEventTime(long time) throws Exception {
                        if (time == watermarkTimer) {
                                watermarkTimer = -1;
-                               Trigger.TriggerResult firstTriggerResult = 
trigger.onEventTime(time, window, this);
+                               TriggerResult firstTriggerResult = 
trigger.onEventTime(time, window, this);
 
                                if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
                                        // fire now and don't wait for the next 
watermark update
-                                       Trigger.TriggerResult 
secondTriggerResult = onEventTime(watermarkTimer);
-                                       return 
Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+                                       TriggerResult secondTriggerResult = 
onEventTime(watermarkTimer);
+                                       return 
TriggerResult.merge(firstTriggerResult, secondTriggerResult);
                                } else {
                                        return firstTriggerResult;
                                }
 
                        } else {
-                               return Trigger.TriggerResult.CONTINUE;
+                               return TriggerResult.CONTINUE;
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index eccaeee..5fc89e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
@@ -243,13 +245,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                        context.key = key;
                        context.window = window;
-                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
+                       TriggerResult triggerResult = 
context.onElement(element);
 
                        processTriggerResult(triggerResult, key, window);
                }
        }
 
-       protected void processTriggerResult(Trigger.TriggerResult 
triggerResult, K key, W window) throws Exception {
+       protected void processTriggerResult(TriggerResult triggerResult, K key, 
W window) throws Exception {
                if (!triggerResult.isFire() && !triggerResult.isPurge()) {
                        // do nothing
                        return;
@@ -293,7 +295,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                context.key = timer.key;
                                context.window = timer.window;
                                setKeyContext(timer.key);
-                               Trigger.TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
+                               TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
                                processTriggerResult(triggerResult, 
context.key, context.window);
                        } else {
                                fire = false;
@@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                context.key = timer.key;
                                context.window = timer.window;
                                setKeyContext(timer.key);
-                               Trigger.TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
+                               TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
                                processTriggerResult(triggerResult, 
context.key, context.window);
                        } else {
                                fire = false;
@@ -338,7 +340,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
         * the {@code Context}
         */
-       protected class Context implements Trigger.TriggerContext {
+       protected class Context implements TriggerContext {
                protected K key;
                protected W window;
 
@@ -427,15 +429,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                }
 
-               public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
+               public TriggerResult onElement(StreamRecord<IN> element) throws 
Exception {
                        return trigger.onElement(element.getValue(), 
element.getTimestamp(), window, this);
                }
 
-               public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
+               public TriggerResult onProcessingTime(long time) throws 
Exception {
                        return trigger.onProcessingTime(time, window, this);
                }
 
-               public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
+               public TriggerResult onEventTime(long time) throws Exception {
                        return trigger.onEventTime(time, window, this);
                }
 

Reply via email to