[FLINK-3200] Fix Triggers by introducing clear() method to clean up 
state/triggers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/456d0aba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/456d0aba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/456d0aba

Branch: refs/heads/master
Commit: 456d0abaf7722ab16d91c5e2b52d80c076513921
Parents: e4d05f7
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Feb 2 18:03:06 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:28:32 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |  19 +++-
 .../api/windowing/assigners/GlobalWindows.java  |   3 +
 .../triggers/ContinuousEventTimeTrigger.java    |   3 +
 .../ContinuousProcessingTimeTrigger.java        |   3 +
 .../api/windowing/triggers/CountTrigger.java    |   5 +
 .../api/windowing/triggers/DeltaTrigger.java    |   5 +
 .../windowing/triggers/EventTimeTrigger.java    |   5 +
 .../triggers/ProcessingTimeTrigger.java         |   5 +
 .../api/windowing/triggers/PurgingTrigger.java  |   5 +
 .../api/windowing/triggers/Trigger.java         |  17 ++++
 .../windowing/NonKeyedWindowOperator.java       |  25 +++++
 .../operators/windowing/WindowOperator.java     | 100 ++++++++++++-------
 12 files changed, 157 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 69f61bc..dafe86f 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -45,7 +45,7 @@ public class SessionWindowing {
 
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(2);
+               env.setParallelism(1);
 
                final List<Tuple3<String, Long, Integer>> input = new 
ArrayList<>();
 
@@ -103,7 +103,7 @@ public class SessionWindowing {
 
                private final Long sessionTimeout;
 
-               private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("last-seen", 1L,
+               private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("last-seen", -1L,
                        BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
 
 
@@ -120,12 +120,15 @@ public class SessionWindowing {
 
                        Long timeSinceLastEvent = timestamp - lastSeen;
 
+                       ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
+
                        // Update the last seen event time
                        lastSeenState.update(timestamp);
 
                        ctx.registerEventTimeTimer(timestamp + sessionTimeout);
 
-                       if (timeSinceLastEvent > sessionTimeout) {
+                       if (lastSeen != -1 && timeSinceLastEvent > 
sessionTimeout) {
+                               System.out.println("FIRING ON ELEMENT: " + 
element + " ts: " + timestamp + " last " + lastSeen);
                                return TriggerResult.FIRE_AND_PURGE;
                        } else {
                                return TriggerResult.CONTINUE;
@@ -138,6 +141,7 @@ public class SessionWindowing {
                        Long lastSeen = lastSeenState.value();
 
                        if (time - lastSeen >= sessionTimeout) {
+                               System.out.println("CTX: " + ctx + " Firing 
Time " + time + " last seen " + lastSeen);
                                return TriggerResult.FIRE_AND_PURGE;
                        }
                        return TriggerResult.CONTINUE;
@@ -147,6 +151,15 @@ public class SessionWindowing {
                public TriggerResult onProcessingTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                }
+
+               @Override
+               public void clear(GlobalWindow window, TriggerContext ctx) 
throws Exception {
+                       ValueState<Long> lastSeenState = 
ctx.getPartitionedState(stateDesc);
+                       if (lastSeenState.value() != -1) {
+                               ctx.deleteEventTimeTimer(lastSeenState.value() 
+ sessionTimeout);
+                       }
+                       lastSeenState.clear();
+               }
        }
 
        // 
*************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 99a4962..d3eb2ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -87,6 +87,9 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
                public TriggerResult onProcessingTime(long time, GlobalWindow 
window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                }
+
+               @Override
+               public void clear(GlobalWindow window, TriggerContext ctx) 
throws Exception {}
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 17818af..21e35db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -74,6 +74,9 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
        }
 
        @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {}
+
+       @Override
        public String toString() {
                return "ContinuousProcessingTimeTrigger(" + interval + ")";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 20a2274..10c975f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -90,6 +90,9 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
                return TriggerResult.CONTINUE;
        }
 
+       @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {}
+
        @VisibleForTesting
        public long getInterval() {
                return interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index e8742d5..3fcfb46 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -66,6 +66,11 @@ public class CountTrigger<W extends Window> implements 
Trigger<Object, W> {
        }
 
        @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               ctx.getPartitionedState(stateDesc).clear();
+       }
+
+       @Override
        public String toString() {
                return "CountTrigger(" +  maxCount + ")";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 60ada88..3135961 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -72,6 +72,11 @@ public class DeltaTrigger<T, W extends Window> implements 
Trigger<T, W> {
        }
 
        @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               ctx.getPartitionedState(stateDesc).clear();
+       }
+
+       @Override
        public String toString() {
                return "DeltaTrigger(" +  deltaFunction + ", " + threshold + 
")";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 831e360..bbd0a01 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -47,6 +47,11 @@ public class EventTimeTrigger implements Trigger<Object, 
TimeWindow> {
        }
 
        @Override
+       public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {
+               ctx.deleteEventTimeTimer(window.maxTimestamp());
+       }
+
+       @Override
        public String toString() {
                return "EventTimeTrigger()";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index b460c8a..85d6749 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -45,6 +45,11 @@ public class ProcessingTimeTrigger implements 
Trigger<Object, TimeWindow> {
        }
 
        @Override
+       public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {
+               ctx.deleteProcessingTimeTimer(window.maxTimestamp());
+       }
+
+       @Override
        public String toString() {
                return "ProcessingTimeTrigger()";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index cc20296..626906c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -79,6 +79,11 @@ public class PurgingTrigger<T, W extends Window> implements 
Trigger<T, W> {
        }
 
        @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               nestedTrigger.clear(window, ctx);
+       }
+
+       @Override
        public String toString() {
                return "PurgingTrigger(" + nestedTrigger.toString() + ")";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 8ea50b3..5c71355 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -73,6 +73,13 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
         */
        TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
throws Exception;
 
+       /**
+        * Clears any state that the trigger might still hold for the given 
window. This is called
+        * when a window is purged. Timers set using {@link 
TriggerContext#registerEventTimeTimer(long)}
+        * and {@link TriggerContext#registerProcessingTimeTimer(long)} should 
be deleted here as
+        * well as state acquired using {@link 
TriggerContext#getPartitionedState(StateDescriptor)}.
+        */
+       void clear(W window, TriggerContext ctx) throws Exception;
 
        /**
         * Result type for trigger methods. This determines what happens with 
the window.
@@ -151,6 +158,16 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
                void registerEventTimeTimer(long time);
 
                /**
+                * Delete the processing time trigger for the given time.
+                */
+               void deleteProcessingTimeTimer(long time);
+
+               /**
+                * Delete the event-time trigger for the given time.
+                */
+               void deleteEventTimeTimer(long time);
+
+               /**
                 * Retrieves an {@link State} object that can be used to 
interact with
                 * fault-tolerant state that is scoped to the window and key of 
the current
                 * trigger invocation.

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d9fa9f0..1b712d9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -284,6 +284,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                if (triggerResult.isFire()) {
                        emitWindow(context);
                }
+
+               if (triggerResult.isPurge()) {
+                       context.clear();
+               }
        }
 
        @Override
@@ -516,6 +520,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        triggers.add(this);
                }
 
+               @Override
+               public void deleteProcessingTimeTimer(long time) {
+                       Set<Context> triggers = processingTimeTimers.get(time);
+                       if (triggers != null) {
+                               triggers.remove(this);
+                       }
+               }
+
+               @Override
+               public void deleteEventTimeTimer(long time) {
+                       Set<Context> triggers = watermarkTimers.get(time);
+                       if (triggers != null) {
+                               triggers.remove(this);
+                       }
+
+               }
+
                public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
                        Trigger.TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
                        if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
@@ -553,6 +574,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                                return Trigger.TriggerResult.CONTINUE;
                        }
                }
+
+               public void clear() throws Exception {
+                       trigger.clear(window, this);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 5109dae..d562925 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -85,8 +85,8 @@ import static java.util.Objects.requireNonNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
-               extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, 
K, W>>
-               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
+       extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
+       implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
 
@@ -164,12 +164,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
         */
        public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-                       TypeSerializer<W> windowSerializer,
-                       KeySelector<IN, K> keySelector,
-                       TypeSerializer<K> keySerializer,
-                       StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor,
-                       WindowFunction<ACC, OUT, K, W> windowFunction,
-                       Trigger<? super IN, ? super W> trigger) {
+               TypeSerializer<W> windowSerializer,
+               KeySelector<IN, K> keySelector,
+               TypeSerializer<K> keySerializer,
+               StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor,
+               WindowFunction<ACC, OUT, K, W> windowFunction,
+               Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);
 
@@ -258,8 +258,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                if (triggerResult.isFire()) {
                        
timestampedCollector.setTimestamp(window.maxTimestamp());
 
-                       setKeyContext(key);
-
                        MergingState<IN, ACC> windowState = 
getPartitionedState(window, windowSerializer,
                                windowStateDescriptor);
 
@@ -269,12 +267,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                        if (triggerResult.isPurge()) {
                                windowState.clear();
+                               context.clear();
                        }
                } else if (triggerResult.isPurge()) {
-                       setKeyContext(key);
                        MergingState<IN, ACC> windowState = 
getPartitionedState(window, windowSerializer,
                                windowStateDescriptor);
                        windowState.clear();
+                       context.clear();
                }
        }
 
@@ -293,7 +292,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                                context.key = timer.key;
                                context.window = timer.window;
-                               Trigger.TriggerResult triggerResult = 
context.onEventTime(mark.getTimestamp());
+                               setKeyContext(timer.key);
+                               Trigger.TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
                                processTriggerResult(triggerResult, 
context.key, context.window);
                        } else {
                                fire = false;
@@ -319,7 +319,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                                context.key = timer.key;
                                context.window = timer.window;
-                               Trigger.TriggerResult triggerResult = 
context.onProcessingTime(time);
+                               setKeyContext(timer.key);
+                               Trigger.TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
                                processTriggerResult(triggerResult, 
context.key, context.window);
                        } else {
                                fire = false;
@@ -410,6 +411,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        }
                }
 
+               @Override
+               public void deleteProcessingTimeTimer(long time) {
+                       Timer<K, W> timer = new Timer<>(time, key, window);
+                       if (processingTimeTimers.remove(timer)) {
+                               processingTimeTimersQueue.remove(timer);
+                       }
+               }
+
+               @Override
+               public void deleteEventTimeTimer(long time) {
+                       Timer<K, W> timer = new Timer<>(time, key, window);
+                       if (watermarkTimers.remove(timer)) {
+                               watermarkTimersQueue.remove(timer);
+                       }
+
+               }
+
                public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
                        return trigger.onElement(element.getValue(), 
element.getTimestamp(), window, this);
                }
@@ -421,6 +439,18 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
                        return trigger.onEventTime(time, window, this);
                }
+
+               public void clear() throws Exception {
+                       trigger.clear(window, this);
+               }
+
+               @Override
+               public String toString() {
+                       return "Context{" +
+                               "key=" + key +
+                               ", window=" + window +
+                               '}';
+               }
        }
 
        /**
@@ -454,8 +484,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        Timer<?, ?> timer = (Timer<?, ?>) o;
 
                        return timestamp == timer.timestamp
-                                       && key.equals(timer.key)
-                                       && window.equals(timer.window);
+                               && key.equals(timer.key)
+                               && window.equals(timer.window);
 
                }
 
@@ -470,10 +500,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                @Override
                public String toString() {
                        return "Timer{" +
-                                       "timestamp=" + timestamp +
-                                       ", key=" + key +
-                                       ", window=" + window +
-                                       '}';
+                               "timestamp=" + timestamp +
+                               ", key=" + key +
+                               ", window=" + window +
+                               '}';
                }
        }
 

Reply via email to