Repository: flink Updated Branches: refs/heads/master e71196972 -> f760b616a
Replace Trigger.onTime by Trigger.onProcessingTime/onEventTime This also renames WatermarkTrigger to EventTimeTrigger and ContinuousWatermarkTrigger to ContinuousEventTimeTrigger. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f760b616 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f760b616 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f760b616 Branch: refs/heads/master Commit: f760b616af0e1608cb4c190aeb264da72f624f4c Parents: 4442269 Author: Aljoscha Krettek <[email protected]> Authored: Sat Oct 17 13:35:24 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Oct 20 18:39:12 2015 +0200 ---------------------------------------------------------------------- docs/apis/streaming_guide.md | 14 +-- .../api/windowing/assigners/GlobalWindows.java | 7 +- .../windowing/assigners/SlidingTimeWindows.java | 4 +- .../assigners/TumblingTimeWindows.java | 4 +- .../triggers/ContinuousEventTimeTrigger.java | 90 ++++++++++++++++++++ .../ContinuousProcessingTimeTrigger.java | 8 +- .../triggers/ContinuousWatermarkTrigger.java | 84 ------------------ .../api/windowing/triggers/CountTrigger.java | 10 ++- .../api/windowing/triggers/DeltaTrigger.java | 10 ++- .../windowing/triggers/EventTimeTrigger.java | 62 ++++++++++++++ .../triggers/ProcessingTimeTrigger.java | 8 +- .../api/windowing/triggers/PurgingTrigger.java | 17 +++- .../api/windowing/triggers/Trigger.java | 25 ++++-- .../windowing/triggers/WatermarkTrigger.java | 56 ------------ .../windowing/NonKeyedWindowOperator.java | 6 +- .../operators/windowing/WindowOperator.java | 6 +- .../windowing/AllWindowTranslationTest.java | 8 +- .../windowing/EvictingWindowOperatorTest.java | 1 - .../windowing/NonKeyedWindowOperatorTest.java | 11 ++- .../operators/windowing/WindowOperatorTest.java | 10 +-- .../windowing/WindowTranslationTest.java | 8 +- .../examples/windowing/SessionWindowing.java | 10 ++- 22 files changed, 263 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 9257ae1..9fce0d7 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -1008,7 +1008,7 @@ dataStream.union(otherStream1, otherStream2, ...) {% highlight scala %} dataStream.join(otherStream) .where(0).equalTo(1) - .onTimeWindow(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply { ... } {% endhighlight %} </td> @@ -2308,7 +2308,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create()); The elements on the triggered window are henceforth discarded. </p> {% highlight java %} -windowedStream.trigger(WatermarkTrigger.create()); +windowedStream.trigger(EventTimeTrigger.create()); {% endhighlight %} </td> </tr> @@ -2334,7 +2334,7 @@ windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE The elements on the triggered window are retained. </p> {% highlight java %} -windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, TimeUnit.SECONDS))); +windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %} </td> </tr> @@ -2414,7 +2414,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create); The elements on the triggered window are henceforth discarded. </p> {% highlight scala %} -windowedStream.trigger(WatermarkTrigger.create); +windowedStream.trigger(EventTimeTrigger.create); {% endhighlight %} </td> </tr> @@ -2440,7 +2440,7 @@ windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE The elements on the triggered window are retained. </p> {% highlight scala %} -windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, TimeUnit.SECONDS))); +windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %} </td> </tr> @@ -2653,7 +2653,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS)) <td> {% highlight java %} stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))) - .trigger(WatermarkTrigger.create()) + .trigger(EventTimeTrigger.create()) {% endhighlight %} </td> </tr> @@ -2667,7 +2667,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) <td> {% highlight java %} stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) - .trigger(WatermarkTrigger.create()) + .trigger(EventTimeTrigger.create()) {% endhighlight %} </td> </tr> http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 9b7c8f2..4d5b9d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -79,7 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, TriggerContext ctx) { return TriggerResult.CONTINUE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index 7b1f1f4..5f7ab45 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.ArrayList; @@ -81,7 +81,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> { if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return ProcessingTimeTrigger.create(); } else { - return WatermarkTrigger.create(); + return EventTimeTrigger.create(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index aa019e4..463b2c4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.Collection; @@ -67,7 +67,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> { if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return ProcessingTimeTrigger.create(); } else { - return WatermarkTrigger.create(); + return EventTimeTrigger.create(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java new file mode 100644 index 0000000..ea26309 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * A {@link Trigger} that continuously fires based on a given time interval. This fires based + * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + * + * @param <W> The type of {@link Window Windows} on which this trigger can operate. + */ +public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object, W> { + private static final long serialVersionUID = 1L; + + private final long interval; + + private ContinuousEventTimeTrigger(long interval) { + this.interval = interval; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { + + OperatorState<Boolean> first = ctx.getKeyValueState("first", true); + + if (first.value()) { + long start = timestamp - (timestamp % interval); + long nextFireTimestamp = start + interval; + + ctx.registerEventTimeTimer(nextFireTimestamp); + + first.update(false); + return TriggerResult.CONTINUE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TriggerContext ctx) { + ctx.registerEventTimeTimer(time + interval); + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onProcessingTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public String toString() { + return "ContinuousProcessingTimeTrigger(" + interval + ")"; + } + + @VisibleForTesting + public long getInterval() { + return interval; + } + + /** + * Creates a trigger that continuously fires based on the given interval. + * + * @param interval The time interval at which to fire. + * @param <W> The type of {@link Window Windows} on which this trigger can operate. + */ + public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime interval) { + return new ContinuousEventTimeTrigger<>(interval.toMilliseconds()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 3ea60f4..be56738 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -63,7 +63,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge } @Override - public TriggerResult onTime(long time, TriggerContext ctx) throws Exception { + public TriggerResult onEventTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception { OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L); long nextFireTimestamp = fireState.value(); http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java deleted file mode 100644 index 494ba3a..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.windowing.triggers; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; -import org.apache.flink.streaming.api.windowing.windows.Window; - -/** - * A {@link Trigger} that continuously fires based on a given time interval. This fires based - * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. - * - * @see org.apache.flink.streaming.api.watermark.Watermark - * - * @param <W> The type of {@link Window Windows} on which this trigger can operate. - */ -public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> { - private static final long serialVersionUID = 1L; - - private final long interval; - - private ContinuousWatermarkTrigger(long interval) { - this.interval = interval; - } - - @Override - public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { - - OperatorState<Boolean> first = ctx.getKeyValueState("first", true); - - if (first.value()) { - long start = timestamp - (timestamp % interval); - long nextFireTimestamp = start + interval; - - ctx.registerWatermarkTimer(nextFireTimestamp); - - first.update(false); - return TriggerResult.CONTINUE; - } - return TriggerResult.CONTINUE; - } - - @Override - public TriggerResult onTime(long time, TriggerContext ctx) { - ctx.registerWatermarkTimer(time + interval); - return TriggerResult.FIRE; - } - - @Override - public String toString() { - return "ContinuousProcessingTimeTrigger(" + interval + ")"; - } - - @VisibleForTesting - public long getInterval() { - return interval; - } - - /** - * Creates a trigger that continuously fires based on the given interval. - * - * @param interval The time interval at which to fire. - * @param <W> The type of {@link Window Windows} on which this trigger can operate. - */ - public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) { - return new ContinuousWatermarkTrigger<>(interval.toMilliseconds()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index 57582f7..8512989 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -49,8 +49,14 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) { - return null; + public TriggerResult onEventTime(long time, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index b1283f5..1c6523d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -59,8 +59,14 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T } @Override - public TriggerResult onTime(long time, TriggerContext ctx) { - return null; + public TriggerResult onEventTime(long time, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java new file mode 100644 index 0000000..4b6613c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +/** + * A {@link Trigger} that fires once the watermark passes the end of the window + * to which a pane belongs. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + */ +public class EventTimeTrigger implements Trigger<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private EventTimeTrigger() {} + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TriggerContext ctx) { + return TriggerResult.FIRE_AND_PURGE; + } + + @Override + public TriggerResult onProcessingTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public String toString() { + return "EventTimeTrigger()"; + } + + /** + * Creates trigger that fires once the watermark passes the end of the window. + */ + public static EventTimeTrigger create() { + return new EventTimeTrigger(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index 70c57ef..6278ba6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -35,7 +35,13 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index 76e36b1..eaca336 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -53,8 +53,21 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) throws Exception { - TriggerResult triggerResult = nestedTrigger.onTime(time, ctx); + public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception { + TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx); + switch (triggerResult) { + case FIRE: + return TriggerResult.FIRE_AND_PURGE; + case FIRE_AND_PURGE: + return TriggerResult.FIRE_AND_PURGE; + default: + return TriggerResult.CONTINUE; + } + } + + @Override + public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception { + TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx); switch (triggerResult) { case FIRE: return TriggerResult.FIRE_AND_PURGE; http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 56b8687..ef8110b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -55,12 +55,21 @@ public interface Trigger<T, W extends Window> extends Serializable { TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** - * Called when a timer that was set using the trigger context fires. + * Called when a processing-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onTime(long time, TriggerContext ctx) throws Exception; + TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception; + + /** + * Called when an event-time timer that was set using the trigger context fires. + * + * @param time The timestamp at which the timer fired. + * @param ctx A context object that can be used to register timer callbacks. + */ + TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception; + /** * Result type for trigger methods. This determines what happens which the window. @@ -82,21 +91,21 @@ public interface Trigger<T, W extends Window> extends Serializable { /** * Register a system time callback. When the current system time passes the specified - * time {@link #onTime(long, TriggerContext)} is called. + * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here. * - * @param time The time at which to invoke {@link #onTime(long, TriggerContext)} + * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)} */ void registerProcessingTimeTimer(long time); /** - * Register a watermark callback. When the current watermark passes the specified - * time {@link #onTime(long, TriggerContext)} is called. + * Register an event-time callback. When the current watermark passes the specified + * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here. * * @see org.apache.flink.streaming.api.watermark.Watermark * - * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)} + * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)} */ - void registerWatermarkTimer(long time); + void registerEventTimeTimer(long time); /** * Retrieves an {@link OperatorState} object that can be used to interact with http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java deleted file mode 100644 index d17066b..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.windowing.triggers; - -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; - -/** - * A {@link Trigger} that fires once the watermark passes the end of the window - * to which a pane belongs. - * - * @see org.apache.flink.streaming.api.watermark.Watermark - */ -public class WatermarkTrigger implements Trigger<Object, TimeWindow> { - private static final long serialVersionUID = 1L; - - private WatermarkTrigger() {} - - @Override - public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { - ctx.registerWatermarkTimer(window.maxTimestamp()); - return TriggerResult.CONTINUE; - } - - @Override - public TriggerResult onTime(long time, TriggerContext ctx) { - return TriggerResult.FIRE_AND_PURGE; - } - - @Override - public String toString() { - return "WatermarkTrigger()"; - } - - /** - * Creates trigger that fires once the watermark passes the end of the window. - */ - public static WatermarkTrigger create() { - return new WatermarkTrigger(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index 7ab33cf..5de6cd1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -420,7 +420,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> } @Override - public void registerWatermarkTimer(long time) { + public void registerEventTimeTimer(long time) { if (watermarkTimer == time) { // we already have set a trigger for that time return; @@ -436,7 +436,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public Trigger.TriggerResult onProcessingTime(long time) throws Exception { if (time == processingTimeTimer) { - return trigger.onTime(time, this); + return trigger.onProcessingTime(time, this); } else { return Trigger.TriggerResult.CONTINUE; } @@ -444,7 +444,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public Trigger.TriggerResult onEventTime(long time) throws Exception { if (time == watermarkTimer) { - return trigger.onTime(time, this); + return trigger.onEventTime(time, this); } else { return Trigger.TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 0b3274f..2491c57 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -489,7 +489,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> } @Override - public void registerWatermarkTimer(long time) { + public void registerEventTimeTimer(long time) { if (watermarkTimer == time) { // we already have set a trigger for that time return; @@ -505,7 +505,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> public Trigger.TriggerResult onProcessingTime(long time) throws Exception { if (time == processingTimeTimer) { - return trigger.onTime(time, this); + return trigger.onProcessingTime(time, this); } else { return Trigger.TriggerResult.CONTINUE; } @@ -513,7 +513,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> public Trigger.TriggerResult onEventTime(long time) throws Exception { if (time == watermarkTimer) { - return trigger.onTime(time, this); + return trigger.onEventTime(time, this); } else { return Trigger.TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 45ef29f..282c71f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; @@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator); EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index afc65d5..1821308 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -17,7 +17,6 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; -import jdk.nashorn.internal.objects.Global; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java index a91d957..02e032a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; @@ -29,10 +28,10 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; @@ -79,7 +78,7 @@ public class NonKeyedWindowOperatorTest { new TimeWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), - WatermarkTrigger.create()); + EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -160,7 +159,7 @@ public class NonKeyedWindowOperatorTest { new TimeWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), - WatermarkTrigger.create()); + EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -239,7 +238,7 @@ public class NonKeyedWindowOperatorTest { new GlobalWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), - ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); + ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index e825b88..b94e530 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -33,10 +33,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindow import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; -import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -82,7 +82,7 @@ public class WindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()), - WatermarkTrigger.create()); + EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -171,7 +171,7 @@ public class WindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()), - WatermarkTrigger.create()); + EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -256,7 +256,7 @@ public class WindowOperatorTest { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), - ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); + ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 02ec820..13766a1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; -import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; @@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator1 instanceof WindowOperator); WindowOperator winOperator1 = (WindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator2 instanceof WindowOperator); WindowOperator winOperator2 = (WindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator1 instanceof EvictingWindowOperator); EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 60b7894..3c63156 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -116,7 +116,7 @@ public class SessionWindowing { // Update the last seen event time lastSeenState.update(timestamp); - ctx.registerWatermarkTimer(lastSeen + sessionTimeout); + ctx.registerEventTimeTimer(lastSeen + sessionTimeout); if (timeSinceLastEvent > sessionTimeout) { return TriggerResult.FIRE_AND_PURGE; @@ -126,7 +126,7 @@ public class SessionWindowing { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) throws Exception { + public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception { OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); Long lastSeen = lastSeenState.value(); @@ -135,6 +135,12 @@ public class SessionWindowing { } return TriggerResult.CONTINUE; } + + @Override + public TriggerResult onProcessingTime(long time, + TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } } // *************************************************************************
