Repository: flink Updated Branches: refs/heads/master a3627f201 -> 0a501e9f7
[FLINK-6001] Fix ContinuousEventTimeTrigger firing without state Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a501e9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a501e9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a501e9f Branch: refs/heads/master Commit: 0a501e9f7f56baba2905002b74746998458db007 Parents: a3627f2 Author: Aljoscha Krettek <[email protected]> Authored: Mon Mar 13 15:04:01 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Mar 15 14:41:36 2017 +0100 ---------------------------------------------------------------------- .../triggers/ContinuousEventTimeTrigger.java | 11 +- .../ContinuousEventTimeTriggerTest.java | 207 +++++++++++++++++++ .../operators/windowing/TriggerTestHarness.java | 35 +++- 3 files changed, 238 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index f3b3e4f..3e31c09 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -77,10 +77,13 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object return TriggerResult.FIRE; } - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); - if (fireTimestamp.get().equals(time)) { - fireTimestamp.clear(); - fireTimestamp.add(time + interval); + ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); + + Long fireTimestamp = fireTimestampState.get(); + + if (fireTimestamp != null && fireTimestamp.equals(time)) { + fireTimestampState.clear(); + fireTimestampState.add(time + interval); ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; } http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java new file mode 100644 index 0000000..0f65a88 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link ContinuousEventTimeTrigger}. + */ +public class ContinuousEventTimeTriggerTest { + + /** + * Verify that the trigger doesn't fail with an NPE if we insert a timer firing when there is + * no trigger state. + */ + @Test + public void testTriggerHandlesAllOnTimerCalls() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.milliseconds(5)), new TimeWindow.Serializer()); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + // this will make the elements we now process fall into late windows, i.e. no trigger state + // will be created + testHarness.advanceWatermark(10); + + // late fires immediately + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + // simulate a GC timer firing + testHarness.invokeOnEventTime(20, new TimeWindow(0, 2)); + } + + + /** + * Verify that state <TimeWindow>of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer()); + + // inject several elements + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(4, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(2); + boolean sawFiring = false; + for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) { + if (r.f0.equals(new TimeWindow(0, 2))) { + sawFiring = true; + assertTrue(r.f1.equals(TriggerResult.FIRE)); + } + } + assertTrue(sawFiring); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + triggerResults = testHarness.advanceWatermark(4); + sawFiring = false; + for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) { + if (r.f0.equals(new TimeWindow(2, 4))) { + sawFiring = true; + assertTrue(r.f1.equals(TriggerResult.FIRE)); + } + } + assertTrue(sawFiring); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + } + + /** + * Verify that late elements trigger immediately and also that we don't set a timer + * for those. + */ + @Test + public void testLateElementTriggersImmediately() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer()); + + testHarness.advanceWatermark(2); + + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } + + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(4, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(3, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); // doesn't clean up timers + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer()); + + assertTrue(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)).canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(4, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(5, testHarness.numEventTimeTimers()); // on merging, timers are not cleaned up + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4))); + + Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(4); + boolean sawFiring = false; + for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) { + if (r.f0.equals(new TimeWindow(0, 4))) { + sawFiring = true; + assertTrue(r.f1.equals(TriggerResult.FIRE)); + } + } + + assertTrue(sawFiring); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java index b9923f2..4267444 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -82,7 +82,7 @@ public class TriggerTestHarness<T, W extends Window> { new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); this.stateBackend = stateBackend; - this.stateBackend.setCurrentKey(0); + this.stateBackend.setCurrentKey(KEY); this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { @Override @@ -215,22 +215,35 @@ public class TriggerTestHarness<T, W extends Window> { Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>(); for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) { - TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( - KEY, - timer.getNamespace(), - internalTimerService, - stateBackend, - windowSerializer); - - TriggerResult triggerResult = - trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext); - + TriggerResult triggerResult = invokeOnEventTime(timer); result.add(new Tuple2<>(timer.getNamespace(), triggerResult)); } return result; } + private TriggerResult invokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + timer.getNamespace(), + internalTimerService, + stateBackend, + windowSerializer); + + return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext); + } + + /** + * Manually invoke {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)} with + * the given parameters. + */ + public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception { + TestInternalTimerService.Timer<Integer, W> timer = + new TestInternalTimerService.Timer<>(timestamp, KEY, window); + + return invokeOnEventTime(timer); + } + /** * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given * parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the
