This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f0d07633a5c8e6511f3d16e04561cb277b65407 Author: Matthias Pohl <[email protected]> AuthorDate: Mon Jul 10 19:11:57 2023 +0200 [FLINK-32570][streaming] Deprecates org.apache.flink.streaming.api.windowing.time.Time-related APIs in favor of Duration Signed-off-by: Matthias Pohl <[email protected]> --- .../flink/streaming/examples/join/WindowJoin.java | 3 +- .../examples/sideoutput/SideOutputExample.java | 3 +- .../examples/socket/SocketWindowWordCount.java | 5 +- .../GroupedProcessingTimeWindowExample.java | 5 +- .../examples/windowing/SessionWindowing.java | 3 +- .../examples/windowing/TopSpeedWindowing.java | 4 +- .../apache/flink/cep/scala/pattern/Pattern.scala | 20 +++- .../flink/cep/scala/pattern/PatternTest.scala | 6 +- .../cep/functions/TimedOutPartialMatchHandler.java | 6 +- .../apache/flink/cep/nfa/compiler/NFACompiler.java | 43 ++++--- .../java/org/apache/flink/cep/pattern/Pattern.java | 131 +++++++++++++++++++-- .../org/apache/flink/cep/pattern/Quantifier.java | 29 ++++- .../apache/flink/cep/operator/CEPOperatorTest.java | 8 +- .../operator/CepProcessFunctionContextTest.java | 4 +- .../api/datastream/AllWindowedStream.java | 21 +++- .../streaming/api/datastream/CoGroupedStreams.java | 63 +++++++++- .../streaming/api/datastream/JoinedStreams.java | 66 +++++++++-- .../streaming/api/datastream/KeyedStream.java | 31 +++-- .../streaming/api/datastream/WindowedStream.java | 19 ++- .../environment/StreamExecutionEnvironment.java | 10 +- .../BoundedOutOfOrdernessTimestampExtractor.java | 25 ++-- .../assigners/EventTimeSessionWindows.java | 18 ++- .../assigners/ProcessingTimeSessionWindows.java | 18 ++- .../assigners/SlidingEventTimeWindows.java | 47 +++++++- .../assigners/SlidingProcessingTimeWindows.java | 44 ++++++- .../assigners/TumblingEventTimeWindows.java | 63 +++++++++- .../assigners/TumblingProcessingTimeWindows.java | 65 +++++++++- .../api/windowing/evictors/TimeEvictor.java | 30 ++++- .../flink/streaming/api/windowing/time/Time.java | 20 ++++ .../triggers/ContinuousEventTimeTrigger.java | 16 ++- .../triggers/ContinuousProcessingTimeTrigger.java | 16 ++- .../operators/windowing/WindowOperatorBuilder.java | 9 +- .../api/datastream/CoGroupedStreamsTest.java | 14 +-- .../api/datastream/JoinedStreamsTest.java | 19 +-- .../planner/match/PatternTranslatorTestBase.scala | 6 +- 35 files changed, 754 insertions(+), 136 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 3b6d6c2bd89..abd44392a60 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -122,7 +121,7 @@ public class WindowJoin { return grades.join(salaries) .where(new NameKeySelector()) .equalTo(new NameKeySelector()) - .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))) + .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) .apply( new JoinFunction< Tuple2<String, Integer>, diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java index 13dc391e817..6e33f5d9ffc 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java @@ -39,7 +39,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.examples.wordcount.util.WordCountData; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -107,7 +106,7 @@ public class SideOutputExample { DataStream<Tuple2<String, Integer>> counts = tokenized .keyBy(value -> value.f0) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) // group by the tuple field "0" and sum up tuple field "1" .sum(1); diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java index 76fb37f321f..584183a3ba9 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java @@ -24,7 +24,8 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; /** * Implements a streaming windowed version of the "WordCount" program. @@ -77,7 +78,7 @@ public class SocketWindowWordCount { }, Types.POJO(WordWithCount.class)) .keyBy(value -> value.word) - .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))) .reduce((a, b) -> new WordWithCount(a.word, a.count + b.count)) .returns(WordWithCount.class); diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index c8020b7d1c3..9db4e35a172 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -31,10 +31,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.time.Duration; + /** An example of grouped stream windowing into sliding time windows. */ public class GroupedProcessingTimeWindowExample { @@ -60,7 +61,7 @@ public class GroupedProcessingTimeWindowExample { stream.keyBy(value -> value.f0) .window( SlidingProcessingTimeWindows.of( - Time.milliseconds(2500), Time.milliseconds(500))) + Duration.ofMillis(2500), Duration.ofMillis(500))) .reduce(new SummingReducer()) // alternative: use a apply function which does not pre-aggregate diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 34b3dd7d285..5708a7ae90e 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.ArrayList; @@ -85,7 +84,7 @@ public class SessionWindowing { // We create sessions for each id with max timeout of 3 time units DataStream<Tuple3<String, Long, Integer>> aggregated = source.keyBy(value -> value.f0) - .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L))) + .window(EventTimeSessionWindows.withGap(Duration.ofMillis(3L))) .sum(2); if (fileOutput) { diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index 3cca4cc8f8f..3d12511ac03 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -36,13 +36,11 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies. import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger; import org.apache.flink.streaming.examples.windowing.util.CarGeneratorFunction; import org.apache.flink.streaming.examples.wordcount.util.CLI; import java.time.Duration; -import java.util.concurrent.TimeUnit; /** * An example of grouped stream windowing where different eviction and trigger policies can be used. @@ -130,7 +128,7 @@ public class TopSpeedWindowing { .withTimestampAssigner((car, ts) -> car.f3)) .keyBy(value -> value.f0) .window(GlobalWindows.create()) - .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) + .evictor(TimeEvictor.of(Duration.ofSeconds(evictionSec))) .trigger( DeltaTrigger.of( triggerMeters, diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index cbbe04d9adc..04606368a03 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -25,6 +25,8 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JC import org.apache.flink.cep.scala.conditions.Context import org.apache.flink.streaming.api.windowing.time.Time +import java.time.Duration + /** * Base class for a pattern definition. * @@ -64,8 +66,14 @@ class Pattern[T, F <: T](jPattern: JPattern[T, F]) { def getName: String = jPattern.getName /** @return Window length in which the pattern match has to occur */ + @deprecated(message = "Use getWindowSize", since = "1.19.0") def getWindowTime: Option[Time] = { - Option(jPattern.getWindowTime) + getWindowSize.map(Time.of) + } + + /** @return Window length in which the pattern match has to occur */ + def getWindowSize: Option[Duration] = { + Option(jPattern.getWindowSize.orElse(null)) } /** @return currently applied quantifier to this pattern */ @@ -253,17 +261,23 @@ class Pattern[T, F <: T](jPattern: JPattern[T, F]) { until(condFun) } + @deprecated(message = "Use within(Duration)", since = "1.19.0") + def within(windowTime: Time): Pattern[T, F] = { + jPattern.within(Time.toDuration(windowTime)) + this + } + /** * Defines the maximum time interval in which a matching pattern has to be completed in order to * be considered valid. This interval corresponds to the maximum time gap between first and the * last event. * * @param windowTime - * Time of the matching window + * Duration of the matching window * @return * The same pattern operator with the new window length */ - def within(windowTime: Time): Pattern[T, F] = { + def within(windowTime: Duration): Pattern[T, F] = { jPattern.within(windowTime) this } diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index 23453c5d167..170eb1fae46 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -202,9 +202,9 @@ class PatternTest { && threeWayEquals(pattern.getName, pattern.wrappedPattern.getName, jPattern.getName) // check equal time windows && threeWayEquals( - pattern.getWindowTime.orNull, - pattern.wrappedPattern.getWindowTime, - jPattern.getWindowTime) + pattern.getWindowSize.orNull, + pattern.wrappedPattern.getWindowSize.orElse(null), + jPattern.getWindowSize.orElse(null)) // check congruent class names / types && threeWayEquals( pattern.getClass.getSimpleName, diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java index ea790e9fc48..ae222d4e0e2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java @@ -19,8 +19,8 @@ package org.apache.flink.cep.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.windowing.time.Time; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -42,8 +42,8 @@ public interface TimedOutPartialMatchHandler<IN> { /** * Called for every timed out partial match (due to {@link - * org.apache.flink.cep.pattern.Pattern#within(Time)}). It enables custom handling, e.g. one can - * emit the timed out results through a side output: + * org.apache.flink.cep.pattern.Pattern#within(Duration)}). It enables custom handling, e.g. one + * can emit the timed out results through a side output: * * <pre>{@code * private final OutputTag<T> timedOutPartialMatchesTag = ... diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 6fd9fa9f2ce..331b2d97033 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -34,9 +34,9 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.RichAndCondition; import org.apache.flink.cep.pattern.conditions.RichNotCondition; -import org.apache.flink.streaming.api.windowing.time.Time; import java.io.Serializable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -318,8 +318,7 @@ public class NFACompiler { */ private State<T> createEndingState() { State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final); - windowTime = - Optional.ofNullable(currentPattern.getWindowTime()).map(Time::toMilliseconds); + windowTime = currentPattern.getWindowSize().map(Duration::toMillis); return endState; } @@ -336,7 +335,7 @@ public class NFACompiler { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions - if ((currentPattern.getWindowTime(WithinType.PREVIOUS_AND_CURRENT) != null + if ((currentPattern.getWindowSize(WithinType.PREVIOUS_AND_CURRENT).isPresent() || getWindowTime() > 0) && lastSink.isFinal()) { final State<T> notFollow = createState(State.StateType.Pending, true); @@ -370,12 +369,15 @@ public class NFACompiler { followingPattern = currentPattern; currentPattern = currentPattern.getPrevious(); - final Time currentWindowTime = currentPattern.getWindowTime(); - if (currentWindowTime != null - && currentWindowTime.toMilliseconds() < windowTime.orElse(Long.MAX_VALUE)) { - // the window time is the global minimum of all window times of each state - windowTime = Optional.of(currentWindowTime.toMilliseconds()); - } + // the window time is the global minimum of all window times of each state + currentPattern + .getWindowSize() + .map(Duration::toMillis) + .filter( + windowSizeInMillis -> + windowSizeInMillis < windowTime.orElse(Long.MAX_VALUE)) + .ifPresent( + windowSizeInMillis -> windowTime = Optional.of(windowSizeInMillis)); } return lastSink; } @@ -422,13 +424,20 @@ public class NFACompiler { State<T> state = createState(currentPattern.getName(), stateType); if (isTake) { Times times = currentPattern.getTimes(); - Time windowTime = currentPattern.getWindowTime(WithinType.PREVIOUS_AND_CURRENT); - if (times == null && windowTime != null) { - windowTimes.put(state.getName(), windowTime.toMilliseconds()); - } else if (times != null - && times.getWindowTime() != null - && state.getName().contains(STATE_NAME_DELIM)) { - windowTimes.put(state.getName(), times.getWindowTime().toMilliseconds()); + Optional<Duration> windowSize = + currentPattern.getWindowSize(WithinType.PREVIOUS_AND_CURRENT); + if (times == null) { + windowSize + .map(Duration::toMillis) + .ifPresent( + windowSizeInMillis -> + windowTimes.put(state.getName(), windowSizeInMillis)); + } else if (state.getName().contains(STATE_NAME_DELIM)) { + times.getWindowSize() + .map(Duration::toMillis) + .ifPresent( + windowSizeInMillis -> + windowTimes.put(state.getName(), windowSizeInMillis)); } } return state; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index e5a9a78976c..0b41224fdd2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -34,8 +34,10 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * Base class for a pattern definition. @@ -64,7 +66,7 @@ public class Pattern<T, F extends T> { private IterativeCondition<F> condition; /** Window length in which the pattern match has to occur. */ - private final Map<WithinType, Time> windowTimes = new HashMap<>(); + private final Map<WithinType, Duration> windowTimes = new HashMap<>(); /** * A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}. @@ -102,12 +104,26 @@ public class Pattern<T, F extends T> { return name; } + /** @deprecated Use {@link #getWindowSize()} */ + @Deprecated + @Nullable public Time getWindowTime() { - return windowTimes.get(WithinType.FIRST_AND_LAST); + return getWindowSize().map(Time::of).orElse(null); } + public Optional<Duration> getWindowSize() { + return getWindowSize(WithinType.FIRST_AND_LAST); + } + + /** @deprecated Use {@link #getWindowSize(WithinType)}. */ + @Deprecated + @Nullable public Time getWindowTime(WithinType withinType) { - return windowTimes.get(withinType); + return getWindowSize(withinType).map(Time::of).orElse(null); + } + + public Optional<Duration> getWindowSize(WithinType withinType) { + return Optional.ofNullable(windowTimes.get(withinType)); } public Quantifier getQuantifier() { @@ -243,6 +259,20 @@ public class Pattern<T, F extends T> { return this; } + /** + * Defines the maximum time interval in which a matching pattern has to be completed in order to + * be considered valid. This interval corresponds to the maximum time gap between first and the + * last event. + * + * @param windowTime Time of the matching window + * @return The same pattern operator with the new window length + * @deprecated Use {@link #within(Duration)}. + */ + @Deprecated + public Pattern<T, F> within(@Nullable Time windowTime) { + return within(Time.toDuration(windowTime)); + } + /** * Defines the maximum time interval in which a matching pattern has to be completed in order to * be considered valid. This interval corresponds to the maximum time gap between first and the @@ -251,7 +281,7 @@ public class Pattern<T, F extends T> { * @param windowTime Time of the matching window * @return The same pattern operator with the new window length */ - public Pattern<T, F> within(Time windowTime) { + public Pattern<T, F> within(@Nullable Duration windowTime) { return within(windowTime, WithinType.FIRST_AND_LAST); } @@ -262,8 +292,22 @@ public class Pattern<T, F extends T> { * @param withinType Type of the within interval between events * @param windowTime Time of the matching window * @return The same pattern operator with the new window length + * @deprecated Use {@link #within(Duration, WithinType)}. */ - public Pattern<T, F> within(Time windowTime, WithinType withinType) { + @Deprecated + public Pattern<T, F> within(@Nullable Time windowTime, WithinType withinType) { + return within(Time.toDuration(windowTime), withinType); + } + + /** + * Defines the maximum time interval in which a matching pattern has to be completed in order to + * be considered valid. This interval corresponds to the maximum time gap between events. + * + * @param withinType Type of the within interval between events + * @param windowTime Time of the matching window + * @return The same pattern operator with the new window length + */ + public Pattern<T, F> within(@Nullable Duration windowTime, WithinType withinType) { if (windowTime != null) { windowTimes.put(withinType, windowTime); } @@ -369,7 +413,7 @@ public class Pattern<T, F extends T> { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> oneOrMore() { - return oneOrMore(null); + return oneOrMore((Duration) null); } /** @@ -385,8 +429,28 @@ public class Pattern<T, F extends T> { * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier * applied. * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + * @deprecated Use {@link #oneOrMore(Duration)} */ + @Deprecated public Pattern<T, F> oneOrMore(@Nullable Time windowTime) { + return oneOrMore(Time.toDuration(windowTime)); + } + + /** + * Specifies that this pattern can occur {@code one or more} times and time interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least one and at most infinite number of events can be matched to this pattern. + * + * <p>If this quantifier is enabled for a pattern {@code A.oneOrMore().followedBy(B)} and a + * sequence of events {@code A1 A2 B} appears, this will generate patterns: {@code A1 B} and + * {@code A1 A2 B}. See also {@link #allowCombinations()}. + * + * @param windowTime time of the matching window between times + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> oneOrMore(@Nullable Duration windowTime) { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy()); @@ -416,7 +480,7 @@ public class Pattern<T, F extends T> { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> times(int times) { - return times(times, null); + return times(times, (Duration) null); } /** @@ -427,8 +491,23 @@ public class Pattern<T, F extends T> { * @param windowTime time of the matching window between times * @return The same pattern with number of times applied * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + * @deprecated Using {@link #times(int, Duration)} */ + @Deprecated public Pattern<T, F> times(int times, @Nullable Time windowTime) { + return times(times, Time.toDuration(windowTime)); + } + + /** + * Specifies exact number of times that this pattern should be matched and time interval + * corresponds to the maximum time gap between previous and current event for each times. + * + * @param times number of times matching event must appear + * @param windowTime time of the matching window between times + * @return The same pattern with number of times applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> times(int times, @Nullable Duration windowTime) { checkIfNoNotPattern(); checkIfQuantifierApplied(); Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); @@ -446,7 +525,7 @@ public class Pattern<T, F extends T> { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> times(int from, int to) { - return times(from, to, null); + return times(from, to, (Duration) null); } /** @@ -458,8 +537,24 @@ public class Pattern<T, F extends T> { * @param windowTime time of the matching window between times * @return The same pattern with the number of times range applied * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + * @deprecated Use {@link #times(int, int, Duration)} */ + @Deprecated public Pattern<T, F> times(int from, int to, @Nullable Time windowTime) { + return times(from, to, Time.toDuration(windowTime)); + } + + /** + * Specifies that the pattern can occur between from and to times with time interval corresponds + * to the maximum time gap between previous and current event for each times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @param windowTime time of the matching window between times + * @return The same pattern with the number of times range applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> times(int from, int to, @Nullable Duration windowTime) { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); @@ -480,7 +575,7 @@ public class Pattern<T, F extends T> { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> timesOrMore(int times) { - return timesOrMore(times, null); + return timesOrMore(times, (Duration) null); } /** @@ -494,8 +589,26 @@ public class Pattern<T, F extends T> { * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier * applied. * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + * @deprecated Use {@link #timesOrMore(int, Duration)} */ + @Deprecated public Pattern<T, F> timesOrMore(int times, @Nullable Time windowTime) { + return timesOrMore(times, Time.toDuration(windowTime)); + } + + /** + * Specifies that this pattern can occur the specified times at least with interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least the specified times and at most infinite number of events can be matched to this + * pattern. + * + * @param times number of times at least matching event must appear + * @param windowTime time of the matching window between times + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> timesOrMore(int times, @Nullable Duration windowTime) { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy()); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index d929368936a..2e5efd48dd5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -23,8 +23,10 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.time.Duration; import java.util.EnumSet; import java.util.Objects; +import java.util.Optional; /** * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}. @@ -190,9 +192,9 @@ public class Quantifier { public static class Times { private final int from; private final int to; - private final @Nullable Time windowTime; + private final @Nullable Duration windowTime; - private Times(int from, int to, @Nullable Time windowTime) { + private Times(int from, int to, @Nullable Duration windowTime) { Preconditions.checkArgument( from > 0, "The from should be a positive number greater than 0."); Preconditions.checkArgument( @@ -211,15 +213,33 @@ public class Quantifier { return to; } + /** @deprecated Use {@link #getWindowSize()}. */ + @Deprecated public Time getWindowTime() { - return windowTime; + return getWindowSize().map(Time::of).orElse(null); } + public Optional<Duration> getWindowSize() { + return Optional.ofNullable(windowTime); + } + + /** @deprecated Use {@link #of(int, int, Duration)} */ + @Deprecated public static Times of(int from, int to, @Nullable Time windowTime) { + return of(from, to, Time.toDuration(windowTime)); + } + + public static Times of(int from, int to, @Nullable Duration windowTime) { return new Times(from, to, windowTime); } + /** @deprecated Use {@link #of(int, Duration)} */ + @Deprecated public static Times of(int times, @Nullable Time windowTime) { + return of(times, Time.toDuration(windowTime)); + } + + public static Times of(int times, @Nullable Duration windowTime) { return new Times(times, times, windowTime); } @@ -237,8 +257,7 @@ public class Quantifier { && ((windowTime == null && times.windowTime == null) || (windowTime != null && times.windowTime != null - && windowTime.toMilliseconds() - == times.windowTime.toMilliseconds())); + && windowTime.toMillis() == times.windowTime.toMillis())); } @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index a065f01655c..6a9786b0125 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -44,7 +44,6 @@ import org.apache.flink.mock.Whitebox; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -62,6 +61,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -1290,7 +1290,7 @@ public class CEPOperatorTest extends TestLogger { .where(SimpleCondition.of(value -> value.getName().equals("end"))) // add a window timeout to test whether timestamps of elements in the // priority queue in CEP operator are correctly checkpointed/restored - .within(Time.milliseconds(10L)); + .within(Duration.ofMillis(10L)); return NFACompiler.compileFactory(pattern, handleTimeout).createNFA(); } @@ -1325,7 +1325,7 @@ public class CEPOperatorTest extends TestLogger { .optional() .followedBy("end") .where(SimpleCondition.of(value -> value.getName().equals("a"))) - .within(Time.milliseconds(10L)); + .within(Duration.ofMillis(10L)); return NFACompiler.compileFactory(pattern, handleTimeout).createNFA(); } @@ -1355,7 +1355,7 @@ public class CEPOperatorTest extends TestLogger { .where(SimpleCondition.of(value -> value.getName().equals("a"))) .followedBy("end") .where(SimpleCondition.of(value -> value.getName().equals("b"))) - .within(Time.milliseconds(10L)); + .within(Duration.ofMillis(10L)); return NFACompiler.compileFactory(pattern, handleTimeout).createNFA(); } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java index ab4e006baba..cce6541744b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java @@ -24,7 +24,6 @@ import org.apache.flink.cep.functions.TimedOutPartialMatchHandler; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -32,6 +31,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -418,7 +418,7 @@ public class CepProcessFunctionContextTest extends TestLogger { public NFA<Event> createNFA() { Pattern<Event, ?> pattern = - Pattern.<Event>begin("1").next("2").within(Time.milliseconds(10)); + Pattern.<Event>begin("1").next("2").within(Duration.ofMillis(10)); return NFACompiler.compileFactory(pattern, true).createNFA(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 639d2af5f64..ce475847c0d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -62,6 +62,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; +import java.time.Duration; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -132,10 +134,25 @@ public class AllWindowedStream<T, W extends Window> { * is {@code 0L}. * * <p>Setting an allowed lateness is only valid for event-time windows. + * + * @deprecated Use {@link #allowedLateness(Duration)}, instead. */ + @Deprecated @PublicEvolving public AllWindowedStream<T, W> allowedLateness(Time lateness) { - final long millis = lateness.toMilliseconds(); + return allowedLateness(lateness.toDuration()); + } + + /** + * Sets the time by which elements are allowed to be late. Elements that arrive behind the + * watermark by more than the specified time will be dropped. By default, the allowed lateness + * is {@code 0L}. + * + * <p>Setting an allowed lateness is only valid for event-time windows. + */ + @PublicEvolving + public AllWindowedStream<T, W> allowedLateness(Duration lateness) { + final long millis = lateness.toMillis(); checkArgument(millis >= 0, "The allowed lateness cannot be negative."); this.allowedLateness = millis; @@ -145,7 +162,7 @@ public class AllWindowedStream<T, W extends Window> { /** * Send late arriving data to the side output identified by the given {@link OutputTag}. Data is * considered late after the watermark has passed the end of the window plus the allowed - * lateness set using {@link #allowedLateness(Time)}. + * lateness set using {@link #allowedLateness(Duration)}. * * <p>You can get the stream of late data using {@link * SingleOutputStreamOperator#getSideOutput(OutputTag)} on the {@link diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index ce328dc26d7..b9a253d4e25 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -42,10 +42,14 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -68,7 +72,7 @@ import static java.util.Objects.requireNonNull; * DataStream<T> result = one.coGroup(two) * .where(new MyFirstKeySelector()) * .equalTo(new MyFirstKeySelector()) - * .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) * .apply(new MyCoGroupFunction()); * }</pre> */ @@ -201,7 +205,7 @@ public class CoGroupedStreams<T1, T2> { assigner, null, null, - null); + (Duration) null); } } } @@ -233,10 +237,15 @@ public class CoGroupedStreams<T1, T2> { private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; - private final Time allowedLateness; + @Nullable private final Duration allowedLateness; private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream; + /** + * @deprecated Use {@link WithWindow#WithWindow(DataStream, DataStream, KeySelector, + * KeySelector, TypeInformation, WindowAssigner, Trigger, Evictor, Duration)} + */ + @Deprecated protected WithWindow( DataStream<T1> input1, DataStream<T2> input2, @@ -246,7 +255,29 @@ public class CoGroupedStreams<T1, T2> { WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, - Time allowedLateness) { + @Nullable Time allowedLateness) { + this( + input1, + input2, + keySelector1, + keySelector2, + keyType, + windowAssigner, + trigger, + evictor, + Time.toDuration(allowedLateness)); + } + + protected WithWindow( + DataStream<T1> input1, + DataStream<T2> input2, + KeySelector<T1, KEY> keySelector1, + KeySelector<T2, KEY> keySelector2, + TypeInformation<KEY> keyType, + WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, + Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, + Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, + @Nullable Duration allowedLateness) { this.input1 = input1; this.input2 = input2; @@ -303,9 +334,21 @@ public class CoGroupedStreams<T1, T2> { * Sets the time by which elements are allowed to be late. * * @see WindowedStream#allowedLateness(Time) + * @deprecated Use {@link #allowedLateness(Duration)} */ + @Deprecated @PublicEvolving - public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { + public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Time newLateness) { + return allowedLateness(Time.toDuration(newLateness)); + } + + /** + * Sets the time by which elements are allowed to be late. + * + * @see WindowedStream#allowedLateness(Duration) + */ + @PublicEvolving + public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration newLateness) { return new WithWindow<>( input1, input2, @@ -421,9 +464,17 @@ public class CoGroupedStreams<T1, T2> { return (SingleOutputStreamOperator<T>) apply(function, resultType); } + /** @deprecated Use {@link #getAllowedLatenessDuration()} */ + @Deprecated @VisibleForTesting + @Nullable Time getAllowedLateness() { - return allowedLateness; + return getAllowedLatenessDuration().map(Time::of).orElse(null); + } + + @VisibleForTesting + Optional<Duration> getAllowedLatenessDuration() { + return Optional.ofNullable(allowedLateness); } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index 83c7ac82c87..9e64d93596a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -35,6 +35,11 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Optional; + import static java.util.Objects.requireNonNull; /** @@ -56,7 +61,7 @@ import static java.util.Objects.requireNonNull; * DataStream<T> result = one.join(two) * .where(new MyFirstKeySelector()) * .equalTo(new MyFirstKeySelector()) - * .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) * .apply(new MyJoinFunction()); * }</pre> */ @@ -187,7 +192,7 @@ public class JoinedStreams<T1, T2> { assigner, null, null, - null); + (Duration) null); } } } @@ -219,10 +224,38 @@ public class JoinedStreams<T1, T2> { private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; - private final Time allowedLateness; + @Nullable private final Duration allowedLateness; private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream; + /** + * @deprecated Use {@link WithWindow#WithWindow(DataStream, DataStream, KeySelector, + * KeySelector, TypeInformation, WindowAssigner, Trigger, Evictor, Duration)}. + */ + @Deprecated + @PublicEvolving + protected WithWindow( + DataStream<T1> input1, + DataStream<T2> input2, + KeySelector<T1, KEY> keySelector1, + KeySelector<T2, KEY> keySelector2, + TypeInformation<KEY> keyType, + WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, + Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, + Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, + @Nullable Time allowedLateness) { + this( + input1, + input2, + keySelector1, + keySelector2, + keyType, + windowAssigner, + trigger, + evictor, + Time.toDuration(allowedLateness)); + } + @PublicEvolving protected WithWindow( DataStream<T1> input1, @@ -233,7 +266,7 @@ public class JoinedStreams<T1, T2> { WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, - Time allowedLateness) { + @Nullable Duration allowedLateness) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); @@ -291,10 +324,22 @@ public class JoinedStreams<T1, T2> { /** * Sets the time by which elements are allowed to be late. * - * @see WindowedStream#allowedLateness(Time) + * @see WindowedStream#allowedLateness(Duration) + * @deprecated Use {@link #allowedLateness(Duration)}. + */ + @Deprecated + @PublicEvolving + public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Time newLateness) { + return allowedLateness(Time.toDuration(newLateness)); + } + + /** + * Sets the time by which elements are allowed to be late. + * + * @see WindowedStream#allowedLateness(Duration) */ @PublicEvolving - public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { + public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration newLateness) { return new WithWindow<>( input1, input2, @@ -481,9 +526,16 @@ public class JoinedStreams<T1, T2> { return (SingleOutputStreamOperator<T>) apply(function, resultType); } + /** @deprecated Use {@link #getAllowedLatenessDuration()}} */ @VisibleForTesting + @Nullable Time getAllowedLateness() { - return allowedLateness; + return getAllowedLatenessDuration().map(Time::of).orElse(null); + } + + @VisibleForTesting + Optional<Duration> getAllowedLatenessDuration() { + return Optional.ofNullable(allowedLateness); } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 6b3f2d1139c..c55c63a93a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -73,6 +73,7 @@ import org.apache.flink.util.Preconditions; import org.apache.commons.lang3.StringUtils; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Stack; @@ -429,7 +430,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { /** * Join elements of this {@link KeyedStream} with elements of another {@link KeyedStream} over a - * time interval that can be specified with {@link IntervalJoin#between(Time, Time)}. + * time interval that can be specified with {@link IntervalJoin#between(Duration, Duration)}. * * @param otherStream The other keyed stream to join this keyed stream with * @param <T1> Type parameter of elements in the other stream @@ -497,9 +498,30 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound + * @deprecated Use {@link #between(Duration, Duration)} */ + @Deprecated @PublicEvolving public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) { + return between(lowerBound.toDuration(), upperBound.toDuration()); + } + + /** + * Specifies the time boundaries over which the join operation works, so that + * + * <pre> + * leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound + * </pre> + * + * <p>By default both the lower and the upper bound are inclusive. This can be configured + * with {@link IntervalJoined#lowerBoundExclusive()} and {@link + * IntervalJoined#upperBoundExclusive()} + * + * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound + * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound + */ + @PublicEvolving + public IntervalJoined<T1, T2, KEY> between(Duration lowerBound, Duration upperBound) { if (timeBehaviour != TimeBehaviour.EventTime) { throw new UnsupportedTimeCharacteristicException( "Time-bounded stream joins are only supported in event time"); @@ -509,12 +531,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); return new IntervalJoined<>( - streamOne, - streamTwo, - lowerBound.toMilliseconds(), - upperBound.toMilliseconds(), - true, - true); + streamOne, streamTwo, lowerBound.toMillis(), upperBound.toMillis(), true, true); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 7e0b9aa48f1..47b22a6057a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -43,6 +43,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder; import org.apache.flink.util.OutputTag; +import java.time.Duration; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -103,9 +105,24 @@ public class WindowedStream<T, K, W extends Window> { * is {@code 0L}. * * <p>Setting an allowed lateness is only valid for event-time windows. + * + * @deprecated Use {@link #allowedLateness(Duration)} */ + @Deprecated @PublicEvolving public WindowedStream<T, K, W> allowedLateness(Time lateness) { + return allowedLateness(lateness.toDuration()); + } + + /** + * Sets the time by which elements are allowed to be late. Elements that arrive behind the + * watermark by more than the specified time will be dropped. By default, the allowed lateness + * is {@code 0L}. + * + * <p>Setting an allowed lateness is only valid for event-time windows. + */ + @PublicEvolving + public WindowedStream<T, K, W> allowedLateness(Duration lateness) { builder.allowedLateness(lateness); return this; } @@ -113,7 +130,7 @@ public class WindowedStream<T, K, W extends Window> { /** * Send late arriving data to the side output identified by the given {@link OutputTag}. Data is * considered late after the watermark has passed the end of the window plus the allowed - * lateness set using {@link #allowedLateness(Time)}. + * lateness set using {@link #allowedLateness(Duration)}. * * <p>You can get the stream of late data using {@link * SingleOutputStreamOperator#getSideOutput(OutputTag)} on the {@link diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index dff237cce20..711dd59d423 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -106,6 +106,7 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; import org.apache.flink.streaming.api.transformations.CacheTransformation; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.util.AbstractID; import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.ExceptionUtils; @@ -979,10 +980,11 @@ public class StreamExecutionEnvironment implements AutoCloseable { * event-time mode. If you need to disable watermarks, please use {@link * ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link * TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link - * WatermarkStrategy}. If you are using generic "time window" operations (for example {@link - * org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)} - * that change behaviour based on the time characteristic, please use equivalent operations - * that explicitly specify processing time or event time. + * WatermarkStrategy}. If you are using generic "time window" operations (for example + * through {@link + * org.apache.flink.streaming.api.datastream.KeyedStream#window(WindowAssigner)} that change + * behaviour based on the time characteristic, please use equivalent operations that + * explicitly specify processing time or event time. */ @PublicEvolving @Deprecated diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java index bb78958761b..0ee8121f1c8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java @@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; +import java.time.Duration; + /** * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the * element with the maximum timestamp (in event time) seen so far by a fixed amount of time, <code> @@ -47,15 +49,24 @@ public abstract class BoundedOutOfOrdernessTimestampExtractor<T> */ private final long maxOutOfOrderness; + /** + * @deprecated Use {@link + * BoundedOutOfOrdernessTimestampExtractor#BoundedOutOfOrdernessTimestampExtractor(Duration)} + */ + @Deprecated public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { - if (maxOutOfOrderness.toMilliseconds() < 0) { - throw new RuntimeException( - "Tried to set the maximum allowed " - + "lateness to " - + maxOutOfOrderness - + ". This parameter cannot be negative."); + this(maxOutOfOrderness.toDuration()); + } + + public BoundedOutOfOrdernessTimestampExtractor(Duration maxOutOfOrderness) { + if (maxOutOfOrderness.isNegative()) { + throw new IllegalArgumentException( + String.format( + "Tried to set the maximum allowed lateness to %s. This parameter cannot be negative.", + maxOutOfOrderness)); } - this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); + + this.maxOutOfOrderness = maxOutOfOrderness.toMillis(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java index ee3a0e0c263..7ee633b0dbb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -40,7 +41,7 @@ import java.util.Collections; * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed = - * keyed.window(EventTimeSessionWindows.withGap(Time.minutes(1))); + * keyed.window(EventTimeSessionWindows.withGap(Duration.ofMinutes(1))); * }</pre> */ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { @@ -85,9 +86,22 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW * * @param size The session timeout, i.e. the time gap between sessions * @return The policy. + * @deprecated Use {@link #withGap(Duration)} */ + @Deprecated public static EventTimeSessionWindows withGap(Time size) { - return new EventTimeSessionWindows(size.toMilliseconds()); + return withGap(size.toDuration()); + } + + /** + * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns elements to sessions + * based on the element timestamp. + * + * @param size The session timeout, i.e. the time gap between sessions + * @return The policy. + */ + public static EventTimeSessionWindows withGap(Duration size) { + return new EventTimeSessionWindows(size.toMillis()); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java index c82309e7259..e77aaedd097 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -40,7 +41,7 @@ import java.util.Collections; * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed = - * keyed.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))); + * keyed.window(ProcessingTimeSessionWindows.withGap(Duration.ofMinutes(1))); * }</pre> */ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { @@ -87,9 +88,22 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, * * @param size The session timeout, i.e. the time gap between sessions * @return The policy. + * @deprecated Use {@link #withGap(Duration)} */ + @Deprecated public static ProcessingTimeSessionWindows withGap(Time size) { - return new ProcessingTimeSessionWindows(size.toMilliseconds()); + return withGap(size.toDuration()); + } + + /** + * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns elements to sessions + * based on the element timestamp. + * + * @param size The session timeout, i.e. the time gap between sessions + * @return The policy. + */ + public static ProcessingTimeSessionWindows withGap(Duration size) { + return new ProcessingTimeSessionWindows(size.toMillis()); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 3711d67b0d2..4b9160aad2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -41,7 +42,7 @@ import java.util.List; * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = - * keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))); + * keyed.window(SlidingEventTimeWindows.of(Duration.ofMinutes(1), Duration.ofSeconds(10))); * }</pre> */ @PublicEvolving @@ -115,9 +116,23 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @return The time policy. + * @deprecated Use {@link #of(Duration, Duration)} */ + @Deprecated public static SlidingEventTimeWindows of(Time size, Time slide) { - return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); + return of(size.toDuration(), slide.toDuration()); + } + + /** + * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to + * sliding time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @return The time policy. + */ + public static SlidingEventTimeWindows of(Duration size, Duration slide) { + return new SlidingEventTimeWindows(size.toMillis(), slide.toMillis(), 0); } /** @@ -138,10 +153,34 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> * @param slide The slide interval of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. + * @deprecated Use {@link #of(Duration, Duration, Duration)} */ + @Deprecated public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { - return new SlidingEventTimeWindows( - size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds()); + return of(size.toDuration(), slide.toDuration(), offset.toDuration()); + } + + /** + * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to + * time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of + * each hour, you can use {@code of(Duration.ofHours(1), Duration.ofMinutes(15))}, then you will + * get time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as + * China which is using UTC+08:00,and you want a time window with size of one day, and window + * begins at every 00:00:00 of local time,you may use {@code of(Duration.ofDays(1), + * Duration.ofHours(-8))}. The parameter of offset is {@code Duration.ofHours(-8))} since + * UTC+08:00 is 8 hours earlier than UTC time. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static SlidingEventTimeWindows of(Duration size, Duration slide, Duration offset) { + return new SlidingEventTimeWindows(size.toMillis(), slide.toMillis(), offset.toMillis()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index 265a09286bf..1ea6de4d53d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -107,9 +108,23 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @return The time policy. + * @deprecated {@link #of(Duration, Duration)} */ + @Deprecated public static SlidingProcessingTimeWindows of(Time size, Time slide) { - return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); + return of(size.toDuration(), slide.toDuration()); + } + + /** + * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to sliding time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @return The time policy. + */ + public static SlidingProcessingTimeWindows of(Duration size, Duration slide) { + return new SlidingProcessingTimeWindows(size.toMillis(), slide.toMillis(), 0); } /** @@ -130,10 +145,35 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin * @param slide The slide interval of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. + * @deprecated Use {@link #of(Duration, Duration, Duration)} */ + @Deprecated public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) { + return of(size.toDuration(), slide.toDuration(), offset.toDuration()); + } + + /** + * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of + * each hour, you can use {@code of(Duration.ofHours(1), Duration.ofMinutes(15))}, then you will + * get time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as + * China which is using UTC+08:00,and you want a time window with size of one day, and window + * begins at every 00:00:00 of local time,you may use {@code of(Duration.ofDays(1), + * Duration.ofHours(-8))}. The parameter of offset is {@code Duration.ofHours(-8))} since + * UTC+08:00 is 8 hours earlier than UTC time. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static SlidingProcessingTimeWindows of(Duration size, Duration slide, Duration offset) { return new SlidingProcessingTimeWindows( - size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds()); + size.toMillis(), slide.toMillis(), offset.toMillis()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index 787514ca523..4fc414c66d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -40,7 +41,7 @@ import java.util.Collections; * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = - * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1))); + * keyed.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))); * }</pre> */ @PublicEvolving @@ -109,9 +110,22 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> * * @param size The size of the generated windows. * @return The time policy. + * @deprecated Use {@link #of(Duration)} */ + @Deprecated public static TumblingEventTimeWindows of(Time size) { - return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED); + return of(size.toDuration()); + } + + /** + * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements + * to time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @return The time policy. + */ + public static TumblingEventTimeWindows of(Duration size) { + return new TumblingEventTimeWindows(size.toMillis(), 0, WindowStagger.ALIGNED); } /** @@ -130,10 +144,33 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> * * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. + * @deprecated Use {@link #of(Duration, Duration)} */ + @Deprecated public static TumblingEventTimeWindows of(Time size, Time offset) { + return of(size.toDuration(), offset.toDuration()); + } + + /** + * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements + * to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of + * each hour, you can use {@code of(Duration.ofHours(1), Duration.ofMinutes(15))}, then you will + * get time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as + * China which is using UTC+08:00,and you want a time window with size of one day, and window + * begins at every 00:00:00 of local time,you may use {@code of(Duration.ofDays(1), + * Duration.ofHours(-8))}. The parameter of offset is {@code Duration.ofHours(-8))} since + * UTC+08:00 is 8 hours earlier than UTC time. + * + * @param size The size of the generated windows. + * @param offset The offset which window start would be shifted by. + */ + public static TumblingEventTimeWindows of(Duration size, Duration offset) { return new TumblingEventTimeWindows( - size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED); + size.toMillis(), offset.toMillis(), WindowStagger.ALIGNED); } /** @@ -144,11 +181,27 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> * @param size The size of the generated windows. * @param offset The globalOffset which window start would be shifted by. * @param windowStagger The utility that produces staggering offset in runtime. + * @deprecated Use {@link #of(Duration, Duration, WindowStagger)} */ + @Deprecated @PublicEvolving public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) { - return new TumblingEventTimeWindows( - size.toMilliseconds(), offset.toMilliseconds(), windowStagger); + return of(size.toDuration(), offset.toDuration(), windowStagger); + } + + /** + * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements + * to time windows based on the element timestamp, offset and a staggering offset, depending on + * the staggering policy. + * + * @param size The size of the generated windows. + * @param offset The globalOffset which window start would be shifted by. + * @param windowStagger The utility that produces staggering offset in runtime. + */ + @PublicEvolving + public static TumblingEventTimeWindows of( + Duration size, Duration offset, WindowStagger windowStagger) { + return new TumblingEventTimeWindows(size.toMillis(), offset.toMillis(), windowStagger); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index e22b63acf57..2d82a4b1e57 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -40,7 +41,7 @@ import java.util.Collections; * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed = - * keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS)); + * keyed.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1), Duration.ofSeconds(10)); * }</pre> */ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { @@ -105,9 +106,22 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi * * @param size The size of the generated windows. * @return The time policy. + * @deprecated Use {@link #of(Duration)} */ + @Deprecated public static TumblingProcessingTimeWindows of(Time size) { - return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED); + return of(size.toDuration()); + } + + /** + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @return The time policy. + */ + public static TumblingProcessingTimeWindows of(Duration size) { + return new TumblingProcessingTimeWindows(size.toMillis(), 0, WindowStagger.ALIGNED); } /** @@ -127,10 +141,34 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. + * @deprecated Use {@link #of(Duration, Duration)} */ + @Deprecated public static TumblingProcessingTimeWindows of(Time size, Time offset) { + return of(size.toDuration(), offset.toDuration()); + } + + /** + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of + * each hour, you can use {@code of(Duration.ofHours(1), Duration.ofMinutes(15))}, then you will + * get time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such + * as China which is using UTC+08:00,and you want a time window with size of one day, and window + * begins at every 00:00:00 of local time, you may use {@code of(Duration.ofDays(1), + * Duration.ofHours(-8))}. The parameter of offset is {@code Duration.ofHours(-8))} since + * UTC+08:00 is 8 hours earlier than UTC time. + * + * @param size The size of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static TumblingProcessingTimeWindows of(Duration size, Duration offset) { return new TumblingProcessingTimeWindows( - size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED); + size.toMillis(), offset.toMillis(), WindowStagger.ALIGNED); } /** @@ -142,12 +180,29 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi * @param offset The offset which window start would be shifted by. * @param windowStagger The utility that produces staggering offset in runtime. * @return The time policy. + * @deprecated Use {@link #of(Duration, Duration, WindowStagger)} */ + @Deprecated @PublicEvolving public static TumblingProcessingTimeWindows of( Time size, Time offset, WindowStagger windowStagger) { - return new TumblingProcessingTimeWindows( - size.toMilliseconds(), offset.toMilliseconds(), windowStagger); + return of(size.toDuration(), offset.toDuration(), windowStagger); + } + + /** + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp, offset and a staggering offset, + * depending on the staggering policy. + * + * @param size The size of the generated windows. + * @param offset The offset which window start would be shifted by. + * @param windowStagger The utility that produces staggering offset in runtime. + * @return The time policy. + */ + @PublicEvolving + public static TumblingProcessingTimeWindows of( + Duration size, Duration offset, WindowStagger windowStagger) { + return new TumblingProcessingTimeWindows(size.toMillis(), offset.toMillis(), windowStagger); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java index 449293ef9ca..5daa589ab01 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; +import java.time.Duration; import java.util.Iterator; /** @@ -124,9 +125,21 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> { * before the window function. * * @param windowSize The amount of time for which to keep elements. + * @deprecated Use {@link #of(Duration)} */ + @Deprecated public static <W extends Window> TimeEvictor<W> of(Time windowSize) { - return new TimeEvictor<>(windowSize.toMilliseconds()); + return of(windowSize.toDuration()); + } + + /** + * Creates a {@code TimeEvictor} that keeps the given number of elements. Eviction is done + * before the window function. + * + * @param windowSize The amount of time for which to keep elements. + */ + public static <W extends Window> TimeEvictor<W> of(Duration windowSize) { + return new TimeEvictor<>(windowSize.toMillis()); } /** @@ -135,8 +148,21 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> { * * @param windowSize The amount of time for which to keep elements. * @param doEvictAfter Whether eviction is done after window function. + * @deprecated Use {@link #of(Duration, boolean)} */ + @Deprecated public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) { - return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter); + return of(windowSize.toDuration(), doEvictAfter); + } + + /** + * Creates a {@code TimeEvictor} that keeps the given number of elements. Eviction is done + * before/after the window function based on the value of doEvictAfter. + * + * @param windowSize The amount of time for which to keep elements. + * @param doEvictAfter Whether eviction is done after window function. + */ + public static <W extends Window> TimeEvictor<W> of(Duration windowSize, boolean doEvictAfter) { + return new TimeEvictor<>(windowSize.toMillis(), doEvictAfter); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java index b842a263dc3..2e8fa587030 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.windowing.time; import org.apache.flink.annotation.Public; +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -27,10 +30,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * The definition of a time interval for windowing. The time characteristic referred to is the * default time characteristic set on the execution environment. + * + * @deprecated Use {@link Duration} */ +@Deprecated @Public public final class Time { + @Nullable + public static Duration toDuration(@Nullable Time time) { + return time != null ? time.toDuration() : null; + } + + @Nullable + public static Time of(@Nullable Duration duration) { + return duration != null ? Time.milliseconds(duration.toMillis()) : null; + } + /** The time unit for this policy's time interval. */ private final TimeUnit unit; @@ -74,6 +90,10 @@ public final class Time { return unit.toMillis(size); } + public Duration toDuration() { + return Duration.ofMillis(this.toMilliseconds()); + } + // ------------------------------------------------------------------------ // Factory // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 8d0dcb87c89..19222b43b48 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; +import java.time.Duration; + /** * A {@link Trigger} that continuously fires based on a given time interval. This fires based on * {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. @@ -133,9 +135,21 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object * * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. + * @deprecated Use {@link #of(Duration)} */ + @Deprecated public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) { - return new ContinuousEventTimeTrigger<>(interval.toMilliseconds()); + return of(interval.toDuration()); + } + + /** + * Creates a trigger that continuously fires based on the given interval. + * + * @param interval The time interval at which to fire. + * @param <W> The type of {@link Window Windows} on which this trigger can operate. + */ + public static <W extends Window> ContinuousEventTimeTrigger<W> of(Duration interval) { + return new ContinuousEventTimeTrigger<>(interval.toMillis()); } private static class Min implements ReduceFunction<Long> { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index e3b1325f0f6..5dda7deca55 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; +import java.time.Duration; + /** * A {@link Trigger} that continuously fires based on a given time interval as measured by the clock * of the machine on which the job is running. @@ -122,9 +124,21 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O * * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. + * @deprecated Use {@link #of(Duration)} */ + @Deprecated public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) { - return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds()); + return of(interval.toDuration()); + } + + /** + * Creates a trigger that continuously fires based on the given interval. + * + * @param interval The time interval at which to fire. + * @param <W> The type of {@link Window Windows} on which this trigger can operate. + */ + public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Duration interval) { + return new ContinuousProcessingTimeTrigger<>(interval.toMillis()); } private static class Min implements ReduceFunction<Long> { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java index f9db7a38f38..8e9564618d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java @@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.lang.reflect.Type; +import java.time.Duration; /** * A builder for creating {@link WindowOperator WindowOperators}. @@ -113,10 +114,16 @@ public class WindowOperatorBuilder<T, K, W extends Window> { this.trigger = trigger; } + /** @deprecated Use {@link #allowedLateness(Duration)}. */ + @Deprecated public void allowedLateness(Time lateness) { + allowedLateness(lateness.toDuration()); + } + + public void allowedLateness(Duration lateness) { Preconditions.checkNotNull(lateness, "Allowed lateness cannot be null"); - final long millis = lateness.toMilliseconds(); + final long millis = lateness.toMillis(); Preconditions.checkArgument(millis >= 0, "The allowed lateness cannot be negative."); this.allowedLateness = millis; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java index d585c3f612e..5a8446d264a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.Duration; + /** Unit test for {@link CoGroupedStreams}. */ public class CoGroupedStreamsTest { private DataStream<String> dataStream1; @@ -43,14 +44,14 @@ public class CoGroupedStreamsTest { dataStream1 = env.fromData("a1", "a2", "a3"); dataStream2 = env.fromData("a1", "a2"); keySelector = element -> element; - tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1L)); + tsAssigner = TumblingEventTimeWindows.of(Duration.ofMillis(1L)); coGroupFunction = (CoGroupFunction<String, String, String>) (first, second, out) -> out.collect(""); } @Test public void testDelegateToCoGrouped() { - Time lateness = Time.milliseconds(42L); + Duration lateness = Duration.ofMillis(42L); CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1 @@ -63,12 +64,12 @@ public class CoGroupedStreamsTest { withLateness.apply(coGroupFunction, BasicTypeInfo.STRING_TYPE_INFO); Assert.assertEquals( - lateness.toMilliseconds(), withLateness.getWindowedStream().getAllowedLateness()); + lateness.toMillis(), withLateness.getWindowedStream().getAllowedLateness()); } @Test public void testSetAllowedLateness() { - Time lateness = Time.milliseconds(42L); + Duration lateness = Duration.ofMillis(42L); CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1 @@ -78,7 +79,6 @@ public class CoGroupedStreamsTest { .window(tsAssigner) .allowedLateness(lateness); - Assert.assertEquals( - lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds()); + Assert.assertEquals(lateness, withLateness.getAllowedLatenessDuration().orElse(null)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java index 879569d6bc5..4a3898a265c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.Duration; + /** Unit test for {@link JoinedStreams}. */ public class JoinedStreamsTest { private DataStream<String> dataStream1; @@ -43,13 +44,13 @@ public class JoinedStreamsTest { dataStream1 = env.fromData("a1", "a2", "a3"); dataStream2 = env.fromData("a1", "a2"); keySelector = element -> element; - tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1)); + tsAssigner = TumblingEventTimeWindows.of(Duration.ofMillis(1)); joinFunction = (first, second) -> first + second; } @Test public void testDelegateToCoGrouped() { - Time lateness = Time.milliseconds(42L); + Duration lateness = Duration.ofMillis(42L); JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1 @@ -62,13 +63,16 @@ public class JoinedStreamsTest { withLateness.apply(joinFunction, BasicTypeInfo.STRING_TYPE_INFO); Assert.assertEquals( - lateness.toMilliseconds(), - withLateness.getCoGroupedWindowedStream().getAllowedLateness().toMilliseconds()); + lateness, + withLateness + .getCoGroupedWindowedStream() + .getAllowedLatenessDuration() + .orElse(null)); } @Test public void testSetAllowedLateness() { - Time lateness = Time.milliseconds(42L); + Duration lateness = Duration.ofMillis(42L); JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1 @@ -78,7 +82,6 @@ public class JoinedStreamsTest { .window(tsAssigner) .allowedLateness(lateness); - Assert.assertEquals( - lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds()); + Assert.assertEquals(lateness, withLateness.getAllowedLatenessDuration().orElse(null)); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala index c8c135c1abc..82344d73aac 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala @@ -121,10 +121,10 @@ abstract class PatternTranslatorTestBase { val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy == currentRight.getAfterMatchSkipStrategy - val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) { - currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds + val sameTimeWindow = if (currentLeft.getWindowSize.isPresent && currentRight != null) { + currentLeft.getWindowSize.get.toMillis == currentRight.getWindowSize.get.toMillis } else { - currentLeft.getWindowTime == null && currentRight.getWindowTime == null + !currentLeft.getWindowSize.isPresent && !currentRight.getWindowSize.isPresent } currentLeft = currentLeft.getPrevious
