Repository: flink
Updated Branches:
  refs/heads/release-1.2 24306adab -> 697ede00d


[FLINK-6001] Fix ContinuousEventTimeTrigger firing without state


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/697ede00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/697ede00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/697ede00

Branch: refs/heads/release-1.2
Commit: 697ede00d044af4eb18a981fd2734fba3e112a30
Parents: 24306ad
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Mar 13 15:04:01 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Mar 15 14:43:34 2017 +0100

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    |  11 +-
 .../ContinuousEventTimeTriggerTest.java         | 207 +++++++++++++++++++
 .../operators/windowing/TriggerTestHarness.java |  35 +++-
 3 files changed, 238 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/697ede00/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index f3b3e4f..3e31c09 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -77,10 +77,13 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
                        return TriggerResult.FIRE;
                }
 
-               ReducingState<Long> fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-               if (fireTimestamp.get().equals(time)) {
-                       fireTimestamp.clear();
-                       fireTimestamp.add(time + interval);
+               ReducingState<Long> fireTimestampState = 
ctx.getPartitionedState(stateDesc);
+
+               Long fireTimestamp = fireTimestampState.get();
+
+               if (fireTimestamp != null && fireTimestamp.equals(time)) {
+                       fireTimestampState.clear();
+                       fireTimestampState.add(time + interval);
                        ctx.registerEventTimeTimer(time + interval);
                        return TriggerResult.FIRE;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/697ede00/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
new file mode 100644
index 0000000..0f65a88
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ContinuousEventTimeTrigger}.
+ */
+public class ContinuousEventTimeTriggerTest {
+
+       /**
+        * Verify that the trigger doesn't fail with an NPE if we insert a 
timer firing when there is
+        * no trigger state.
+        */
+       @Test
+       public void testTriggerHandlesAllOnTimerCalls() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.milliseconds(5)),
 new TimeWindow.Serializer());
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+
+               // this will make the elements we now process fall into late 
windows, i.e. no trigger state
+               // will be created
+               testHarness.advanceWatermark(10);
+
+               // late fires immediately
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               // simulate a GC timer firing
+               testHarness.invokeOnEventTime(20, new TimeWindow(0, 2));
+       }
+
+
+       /**
+        * Verify that state <TimeWindow>of separate windows does not leak into 
other windows.
+        */
+       @Test
+       public void testWindowSeparationAndFiring() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), 
new TimeWindow.Serializer());
+
+               // inject several elements
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(4, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = 
testHarness.advanceWatermark(2);
+               boolean sawFiring = false;
+               for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+                       if (r.f0.equals(new TimeWindow(0, 2))) {
+                               sawFiring = true;
+                               assertTrue(r.f1.equals(TriggerResult.FIRE));
+                       }
+               }
+               assertTrue(sawFiring);
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(3, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               triggerResults = testHarness.advanceWatermark(4);
+               sawFiring = false;
+               for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+                       if (r.f0.equals(new TimeWindow(2, 4))) {
+                               sawFiring = true;
+                               assertTrue(r.f1.equals(TriggerResult.FIRE));
+                       }
+               }
+               assertTrue(sawFiring);
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers());
+       }
+
+       /**
+        * Verify that late elements trigger immediately and also that we don't 
set a timer
+        * for those.
+        */
+       @Test
+       public void testLateElementTriggersImmediately() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), 
new TimeWindow.Serializer());
+
+               testHarness.advanceWatermark(2);
+
+               assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+       }
+
+
+       /**
+        * Verify that clear() does not leak across windows.
+        */
+       @Test
+       public void testClear() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), 
new TimeWindow.Serializer());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(4, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(3, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers()); // doesn't 
clean up timers
+       }
+
+       @Test
+       public void testMergingWindows() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                               new 
TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), 
new TimeWindow.Serializer());
+
+               
assertTrue(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)).canMerge());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(2, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(4, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(5, testHarness.numEventTimeTimers()); // on 
merging, timers are not cleaned up
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(2, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 4)));
+
+               Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = 
testHarness.advanceWatermark(4);
+               boolean sawFiring = false;
+               for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+                       if (r.f0.equals(new TimeWindow(0, 4))) {
+                               sawFiring = true;
+                               assertTrue(r.f1.equals(TriggerResult.FIRE));
+                       }
+               }
+
+               assertTrue(sawFiring);
+
+               assertEquals(1, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/697ede00/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 3fdc94f..a1240d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -81,7 +81,7 @@ public class TriggerTestHarness<T, W extends Window> {
                                new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
                this.stateBackend = stateBackend;
 
-               this.stateBackend.setCurrentKey(0);
+               this.stateBackend.setCurrentKey(KEY);
 
                this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
                        @Override
@@ -214,22 +214,35 @@ public class TriggerTestHarness<T, W extends Window> {
                Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
 
                for (TestInternalTimerService.Timer<Integer, W> timer : 
firedTimers) {
-                       TestTriggerContext<Integer, W> triggerContext = new 
TestTriggerContext<>(
-                                       KEY,
-                                       timer.getNamespace(),
-                                       internalTimerService,
-                                       stateBackend,
-                                       windowSerializer);
-
-                       TriggerResult triggerResult =
-                                       
trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
-
+                       TriggerResult triggerResult = invokeOnEventTime(timer);
                        result.add(new Tuple2<>(timer.getNamespace(), 
triggerResult));
                }
 
                return result;
        }
 
+       private TriggerResult 
invokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws 
Exception {
+               TestTriggerContext<Integer, W> triggerContext = new 
TestTriggerContext<>(
+                               KEY,
+                               timer.getNamespace(),
+                               internalTimerService,
+                               stateBackend,
+                               windowSerializer);
+
+               return trigger.onEventTime(timer.getTimestamp(), 
timer.getNamespace(), triggerContext);
+       }
+
+       /**
+        * Manually invoke {@link Trigger#onEventTime(long, Window, 
Trigger.TriggerContext)} with
+        * the given parameters.
+        */
+       public TriggerResult invokeOnEventTime(long timestamp, W window) throws 
Exception {
+               TestInternalTimerService.Timer<Integer, W> timer =
+                               new TestInternalTimerService.Timer<>(timestamp, 
KEY, window);
+
+               return invokeOnEventTime(timer);
+       }
+
        /**
         * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with 
the given
         * parameters. This also calls {@link Trigger#clear(Window, 
Trigger.TriggerContext)} on the

Reply via email to