[FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff367d6e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff367d6e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff367d6e Branch: refs/heads/master Commit: ff367d6ea728a7c5bc334f34591a4d79e573972f Parents: 28a38bb Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 6 18:19:56 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 17 +-- .../streaming/api/datastream/DataStream.java | 20 +--- .../streaming/api/datastream/KeyedStream.java | 20 +--- .../api/datastream/WindowedStream.java | 29 ++--- .../environment/StreamExecutionEnvironment.java | 12 +++ .../api/windowing/assigners/GlobalWindows.java | 3 +- .../assigners/SlidingProcessingTimeWindows.java | 106 ------------------- .../windowing/assigners/SlidingTimeWindows.java | 11 +- .../TumblingProcessingTimeWindows.java | 81 -------------- .../assigners/TumblingTimeWindows.java | 11 +- .../api/windowing/assigners/WindowAssigner.java | 3 +- .../operators/windowing/WindowOperator.java | 5 + .../flink/streaming/api/CoGroupJoinITCase.java | 6 +- .../windowing/AllWindowTranslationTest.java | 60 +++++++---- .../windowing/WindowTranslationTest.java | 89 +++++++++++++--- .../streaming/examples/join/WindowJoin.java | 3 +- .../scala/examples/join/WindowJoin.scala | 3 +- .../flink/streaming/api/scala/DataStream.scala | 37 +------ .../flink/streaming/api/scala/KeyedStream.scala | 37 +------ .../api/scala/StreamExecutionEnvironment.scala | 5 + .../api/scala/AllWindowTranslationTest.scala | 24 +++-- .../streaming/api/scala/CoGroupJoinITCase.scala | 9 +- .../api/scala/WindowTranslationTest.scala | 22 ++-- 23 files changed, 233 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 89c4857..a8d7654 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; @@ -80,7 +81,7 @@ public class AllWindowedStream<T, W extends Window> { WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.windowAssigner = windowAssigner; - this.trigger = windowAssigner.getDefaultTrigger(); + this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()); } /** @@ -139,12 +140,14 @@ public class AllWindowedStream<T, W extends Window> { OneInputStreamOperator<T, T> operator; + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, new HeapWindowBuffer.Factory<T>(), new ReduceAllWindowFunction<W, T>(function), trigger, - evictor); + evictor).enableSetProcessingTime(setProcessingTime); } else { // we need to copy because we need our own instance of the pre aggregator @@ -154,7 +157,7 @@ public class AllWindowedStream<T, W extends Window> { operator = new NonKeyedWindowOperator<>(windowAssigner, new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy), new ReduceAllWindowFunction<W, T>(function), - trigger); + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, input.getType(), operator).setParallelism(1); @@ -205,20 +208,22 @@ public class AllWindowedStream<T, W extends Window> { String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; - OneInputStreamOperator<T, R> operator; + NonKeyedWindowOperator<T, R, W> operator; + + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, new HeapWindowBuffer.Factory<T>(), function, trigger, - evictor); + evictor).enableSetProcessingTime(setProcessingTime); } else { operator = new NonKeyedWindowOperator<>(windowAssigner, new HeapWindowBuffer.Factory<T>(), function, - trigger); + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, resultType, operator).setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 0be1d56..ee8b3d2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -59,9 +59,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.helper.Count; @@ -72,7 +70,6 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.time.AbstractTime; -import org.apache.flink.streaming.api.windowing.time.EventTime; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator; @@ -726,13 +723,7 @@ public class DataStream<T> { * @param size The size of the window. */ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) { - AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - - if (actualSize instanceof EventTime) { - return windowAll(TumblingTimeWindows.of(actualSize)); - } else { - return windowAll(TumblingProcessingTimeWindows.of(actualSize)); - } + return windowAll(TumblingTimeWindows.of(size)); } /** @@ -747,14 +738,7 @@ public class DataStream<T> { * @param size The size of the window. */ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) { - AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - - if (actualSize instanceof EventTime) { - return windowAll(SlidingTimeWindows.of(size, slide)); - } else { - return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide)); - } + return windowAll(SlidingTimeWindows.of(size, slide)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index edb7981..2e6d7d6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -32,13 +32,10 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.AbstractTime; -import org.apache.flink.streaming.api.windowing.time.EventTime; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; @@ -122,13 +119,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * @param size The size of the window. */ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) { - AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - - if (actualSize instanceof EventTime) { - return window(TumblingTimeWindows.of(actualSize)); - } else { - return window(TumblingProcessingTimeWindows.of(actualSize)); - } + return window(TumblingTimeWindows.of(size)); } /** @@ -143,14 +134,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * @param size The size of the window. */ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) { - AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); - - if (actualSize instanceof EventTime) { - return window(SlidingTimeWindows.of(actualSize, actualSlide)); - } else { - return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide)); - } + return window(SlidingTimeWindows.of(size, slide)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 1273b42..99f7d06 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; @@ -32,8 +33,8 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; @@ -91,7 +92,7 @@ public class WindowedStream<T, K, W extends Window> { WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.windowAssigner = windowAssigner; - this.trigger = windowAssigner.getDefaultTrigger(); + this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()); } /** @@ -151,13 +152,15 @@ public class WindowedStream<T, K, W extends Window> { OneInputStreamOperator<T, T> operator; + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + if (evictor != null) { operator = new EvictingWindowOperator<>(windowAssigner, keySel, new HeapWindowBuffer.Factory<T>(), new ReduceWindowFunction<K, W, T>(function), trigger, - evictor); + evictor).enableSetProcessingTime(setProcessingTime); } else { // we need to copy because we need our own instance of the pre aggregator @@ -168,7 +171,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy), new ReduceWindowFunction<K, W, T>(function), - trigger); + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, input.getType(), operator); @@ -222,7 +225,9 @@ public class WindowedStream<T, K, W extends Window> { String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; KeySelector<T, K> keySel = input.getKeySelector(); - OneInputStreamOperator<T, R> operator; + WindowOperator<K, T, R, W> operator; + + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { operator = new EvictingWindowOperator<>(windowAssigner, @@ -230,14 +235,14 @@ public class WindowedStream<T, K, W extends Window> { new HeapWindowBuffer.Factory<T>(), function, trigger, - evictor); + evictor).enableSetProcessingTime(setProcessingTime); } else { operator = new WindowOperator<>(windowAssigner, keySel, new HeapWindowBuffer.Factory<T>(), function, - trigger); + trigger).enableSetProcessingTime(setProcessingTime);; } return input.transform(opName, resultType, operator); @@ -450,8 +455,8 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<R> resultType, String functionName) { - if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { - SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner; + if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { + SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner; final long windowLength = timeWindows.getSize(); final long windowSlide = timeWindows.getSlide(); @@ -475,8 +480,8 @@ public class WindowedStream<T, K, W extends Window> { wf, input.getKeySelector(), windowLength, windowSlide); return input.transform(opName, resultType, op); } - } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { - TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner; + } else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { + TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner; final long windowLength = timeWindows.getSize(); final long windowSlide = timeWindows.getSize(); http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index c2e2880..cc96217 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -541,11 +541,23 @@ public abstract class StreamExecutionEnvironment { /** * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. + * + * <p> + * If you set the characteristic to IngestionTime of EventTime this will set a default + * watermark update interval of 200 ms. If this is not applicable for your application + * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Objects.requireNonNull(characteristic); + if (characteristic == TimeCharacteristic.ProcessingTime) { + getConfig().disableTimestamps(); + getConfig().setAutoWatermarkInterval(0); + } else { + getConfig().enableTimestamps(); + getConfig().setAutoWatermarkInterval(200); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 52c8f55..dbeb5ce 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; @@ -42,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { } @Override - public Trigger<Object, GlobalWindow> getDefaultTrigger() { + public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java deleted file mode 100644 index 65d7641..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.windowing.assigners; - -import com.google.common.collect.Lists; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.Collection; -import java.util.List; - -/** - * A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing - * is based on system time. Windows can possibly overlap. - * - * <p> - * For example, in order to window into windows of 1 minute, every 10 seconds: - * <pre> {@code - * DataStream<Tuple2<String, Integer>> in = ...; - * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...); - * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed = - * keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS)); - * } </pre> - */ -public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { - private static final long serialVersionUID = 1L; - - private final long size; - - private final long slide; - - private transient List<TimeWindow> result; - - private SlidingProcessingTimeWindows(long size, long slide) { - this.size = size; - this.slide = slide; - this.result = Lists.newArrayListWithCapacity((int) (size / slide)); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.result = Lists.newArrayListWithCapacity((int) (size / slide)); - } - - @Override - public Collection<TimeWindow> assignWindows(Object element, long timestamp) { - result.clear(); - long time = System.currentTimeMillis(); - long lastStart = time - time % slide; - for (long start = lastStart; - start > time - size; - start -= slide) { - result.add(new TimeWindow(start, size)); - } - return result; - } - - public long getSize() { - return size; - } - - public long getSlide() { - return slide; - } - - @Override - public Trigger<Object, TimeWindow> getDefaultTrigger() { - return ProcessingTimeTrigger.create(); - } - - @Override - public String toString() { - return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; - } - - /** - * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns - * elements to sliding time windows based on the current processing time. - * - * @param size The size of the generated windows. - * @param slide The slide interval of the generated windows. - * @return The time policy. - */ - public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) { - return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index 52ae356..6036dfb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -17,7 +17,10 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -72,8 +75,12 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> { } @Override - public Trigger<Object, TimeWindow> getDefaultTrigger() { - return WatermarkTrigger.create(); + public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { + return ProcessingTimeTrigger.create(); + } else { + return WatermarkTrigger.create(); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java deleted file mode 100644 index 41f6362..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.windowing.assigners; - -import org.apache.flink.streaming.api.windowing.time.AbstractTime; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; - -import java.util.Collection; -import java.util.Collections; - -/** - * A {@link WindowAssigner} that windows elements into time-based windows. The windowing is - * based on system time. Windows cannot overlap. - * - * <p> - * For example, in order to window into windows of 1 minute, every 10 seconds: - * <pre> {@code - * DataStream<Tuple2<String, Integer>> in = ...; - * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...); - * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed = - * keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS)); - * } </pre> - */ -public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { - private static final long serialVersionUID = 1L; - - private long size; - - private TumblingProcessingTimeWindows(long size) { - this.size = size; - } - - @Override - public Collection<TimeWindow> assignWindows(Object element, long timestamp) { - long time = System.currentTimeMillis(); - long start = time - (time % size); - return Collections.singletonList(new TimeWindow(start, size)); - } - - public long getSize() { - return size; - } - - @Override - public Trigger<Object, TimeWindow> getDefaultTrigger() { - return ProcessingTimeTrigger.create(); - } - - @Override - public String toString() { - return "TumblingProcessingTimeWindows(" + size + ")"; - } - - /** - * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the current processing time. - * - * @param size The size of the generated windows. - * @return The time policy. - */ - public static TumblingProcessingTimeWindows of(AbstractTime size) { - return new TumblingProcessingTimeWindows(size.toMilliseconds()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index b6022b3..d57dc33 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -17,7 +17,10 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -58,8 +61,12 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> { } @Override - public Trigger<Object, TimeWindow> getDefaultTrigger() { - return WatermarkTrigger.create(); + public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { + return ProcessingTimeTrigger.create(); + } else { + return WatermarkTrigger.create(); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java index 20fe365..d0b1ed0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.Serializable; @@ -50,5 +51,5 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl /** * Returns the default trigger associated with this {@code WindowAssigner}. */ - public abstract Trigger<T, W> getDefaultTrigger(); + public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 548afb3..368b8fa 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -347,6 +347,11 @@ public class WindowOperator<K, IN, OUT, W extends Window> // ------------------------------------------------------------------------ @VisibleForTesting + public boolean isSetProcessingTime() { + return setProcessingTime; + } + + @VisibleForTesting public Trigger<? super IN, ? super W> getTriggerTemplate() { return triggerTemplate; } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java index c06a608..9ddd6eb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java @@ -49,8 +49,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { testResults = Lists.newArrayList(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); - env.getConfig().enableTimestamps(); DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @@ -144,8 +144,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { testResults = Lists.newArrayList(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); - env.getConfig().enableTimestamps(); DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { private static final long serialVersionUID = 1L; @@ -239,8 +239,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { testResults = Lists.newArrayList(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); - env.getConfig().enableTimestamps(); DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 09a7149..4fa16ac 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -19,25 +19,25 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; @@ -50,30 +50,33 @@ import java.util.concurrent.TimeUnit; public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { /** - * These tests ensure that the fast aligned time windows operator is used if the - * conditions are right. - * - * TODO: update once fast aligned time windows operator is in + * These tests ensure that the correct trigger is set when using event-time windows. */ - @Ignore @Test - public void testFastTimeWindows() throws Exception { + @SuppressWarnings("rawtypes") + public void testEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DummyReducer reducer = new DummyReducer(); DataStream<Tuple2<String, Integer>> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator); + Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); + NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; + Assert.assertFalse(winOperator1.isSetProcessingTime()); + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source - .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -88,20 +91,26 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator); + Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); + NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; + Assert.assertFalse(winOperator2.isSetProcessingTime()); + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @Test @SuppressWarnings("rawtypes") public void testNonEvicting() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); DummyReducer reducer = new DummyReducer(); DataStream<Tuple2<String, Integer>> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduce(reducer); @@ -109,12 +118,13 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; + Assert.assertTrue(winOperator1.isSetProcessingTime()); Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source - .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -132,8 +142,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; + Assert.assertTrue(winOperator1.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -141,13 +152,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase @SuppressWarnings("rawtypes") public void testEvicting() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); DummyReducer reducer = new DummyReducer(); DataStream<Tuple2<String, Integer>> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .reduce(reducer); @@ -155,13 +167,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator); EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1; - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertFalse(winOperator1.isSetProcessingTime()); + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source - .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { @@ -180,8 +193,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator); EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2; + Assert.assertFalse(winOperator2.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 5124add..10fe734 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -20,19 +20,20 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; @@ -59,13 +60,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DummyReducer reducer = new DummyReducer(); DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); @@ -74,7 +75,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -92,10 +93,63 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator); } + /** + * These tests ensure that the correct trigger is set when using event-time windows. + */ + @Test + @SuppressWarnings("rawtypes") + public void testEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertFalse(winOperator1.isSetProcessingTime()); + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(Tuple tuple, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof WindowOperator); + WindowOperator winOperator2 = (WindowOperator) operator2; + Assert.assertFalse(winOperator2.isSetProcessingTime()); + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + } + @Test @SuppressWarnings("rawtypes") public void testNonEvicting() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -103,7 +157,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduce(reducer); @@ -111,13 +165,14 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); Assert.assertTrue(operator1 instanceof WindowOperator); WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertTrue(winOperator1.isSetProcessingTime()); Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -135,8 +190,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); Assert.assertTrue(operator2 instanceof WindowOperator); WindowOperator winOperator2 = (WindowOperator) operator2; + Assert.assertTrue(winOperator2.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -144,6 +200,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("rawtypes") public void testEvicting() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -151,7 +208,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .reduce(reducer); @@ -159,14 +216,15 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); Assert.assertTrue(operator1 instanceof EvictingWindowOperator); EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertFalse(winOperator1.isSetProcessingTime()); + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { @@ -185,8 +243,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); Assert.assertTrue(operator2 instanceof EvictingWindowOperator); EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; + Assert.assertFalse(winOperator2.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 8abf9d6..5915a7a 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.TimestampExtractor; @@ -64,7 +65,7 @@ public class WindowJoin { // obtain execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableTimestamps(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // connect to the data sources for grades and salaries Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env); http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala index 225dab7..42484e8 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.join import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time @@ -45,7 +46,7 @@ object WindowJoin { } val env = StreamExecutionEnvironment.getExecutionEnvironment - env.getConfig.enableTimestamps() + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Create streams for grades and salaries by mapping the inputs to the corresponding objects val grades = setGradesInput(env) http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 7babc40..fb4d75d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.scala.function.StatefulFunction import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy} -import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime} +import org.apache.flink.streaming.api.windowing.time.AbstractTime import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.streaming.util.serialization.SerializationSchema import org.apache.flink.util.Collector @@ -624,20 +624,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * @param size The size of the window. */ def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = { - val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) - val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - - actualSize match { - case t: EventTime => - val assigner = TumblingTimeWindows.of(actualSize) - .asInstanceOf[WindowAssigner[T, TimeWindow]] - windowAll(assigner) - case t: ProcessingTime => - val assigner = TumblingProcessingTimeWindows.of(actualSize) - .asInstanceOf[WindowAssigner[T, TimeWindow]] - windowAll(assigner) - case _ => throw new RuntimeException("Invalid time: " + actualSize) - } + val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) } /** @@ -651,23 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * @param size The size of the window. */ def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = { - val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) - val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - - actualSize match { - case t: EventTime => - val assigner = SlidingTimeWindows.of( - actualSize, - actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] - windowAll(assigner) - case t: ProcessingTime => - val assigner = SlidingProcessingTimeWindows.of( - actualSize, - actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] - windowAll(assigner) - case _ => throw new RuntimeException("Invalid time: " + actualSize) - } + val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]] + windowAll(assigner) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 0ce36aa..c605bb1 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator import org.apache.flink.streaming.api.operators.StreamGroupedReduce import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime} +import org.apache.flink.streaming.api.windowing.time.AbstractTime import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow} import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation @@ -50,20 +50,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * @param size The size of the window. */ def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = { - val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) - val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - - actualSize match { - case t: EventTime => - val assigner = TumblingTimeWindows.of(actualSize) - .asInstanceOf[WindowAssigner[T, TimeWindow]] - window(assigner) - case t: ProcessingTime => - val assigner = TumblingProcessingTimeWindows.of(actualSize) - .asInstanceOf[WindowAssigner[T, TimeWindow]] - window(assigner) - case _ => throw new RuntimeException("Invalid time: " + actualSize) - } + val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) } /** @@ -78,23 +66,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * @param size The size of the window. */ def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = { - val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment) - val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic) - - actualSize match { - case t: EventTime => - val assigner = SlidingTimeWindows.of( - actualSize, - actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] - window(assigner) - case t: ProcessingTime => - val assigner = SlidingProcessingTimeWindows.of( - actualSize, - actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] - window(assigner) - case _ => throw new RuntimeException("Invalid time: " + actualSize) - } + val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]] + window(assigner) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 7492e48..f767aba 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -299,6 +299,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. * + * If you set the characteristic to IngestionTime of EventTime this will set a default + * watermark update interval of 200 ms. If this is not applicable for your application + * you should change it using + * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]] + * * @param characteristic The time characteristic. */ def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index dece9f6..99fcd07 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -22,9 +22,10 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichReduceFunction +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows} +import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} @@ -49,13 +50,14 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { @Test def testFastTimeWindows(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of( + .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer) @@ -69,7 +71,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .windowAll(SlidingProcessingTimeWindows.of( + .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -96,7 +98,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of( + .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) @@ -110,13 +112,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]] assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) val window2 = source - .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( @@ -133,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]] assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } @@ -146,7 +148,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of( + .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) @@ -161,12 +163,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger]) assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) val window2 = source - .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -185,7 +187,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index 7232309..3f6e10f 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.TimestampExtractor import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -38,9 +39,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) - env.getConfig.enableTimestamps - + val source1 = env.addSource(new SourceFunction[(String, Int)]() { def run(ctx: SourceFunction.SourceContext[(String, Int)]) { ctx.collect(("a", 0)) @@ -101,8 +102,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) - env.getConfig.enableTimestamps val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { @@ -177,8 +178,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { CoGroupJoinITCase.testResults = mutable.MutableList() val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) - env.getConfig.enableTimestamps val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
