This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6de1fa28e2fe89678d441e79417f1533857b26b1 Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Sat Jan 18 02:31:12 2025 +0800 [FLINK-37136][API] Introduce Window extension components for DataStream V2 --- .../flink/datastream/api/builtin/BuiltinFuncs.java | 91 ++++++++++ .../window/context/OneInputWindowContext.java | 45 +++++ .../window/context/TwoInputWindowContext.java | 57 ++++++ .../extension/window/context/WindowContext.java | 94 ++++++++++ .../OneInputWindowStreamProcessFunction.java | 73 ++++++++ ...putNonBroadcastWindowStreamProcessFunction.java | 100 ++++++++++ .../TwoOutputWindowStreamProcessFunction.java | 82 +++++++++ .../window/function/WindowProcessFunction.java | 44 +++++ .../window/strategy/GlobalWindowStrategy.java | 25 +++ .../window/strategy/SessionWindowStrategy.java | 47 +++++ .../window/strategy/SlidingTimeWindowStrategy.java | 68 +++++++ .../strategy/TumblingTimeWindowStrategy.java | 58 ++++++ .../extension/window/strategy/WindowStrategy.java | 201 +++++++++++++++++++++ 13 files changed, 985 insertions(+) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java index 76b8032cf3e..053fff66071 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java @@ -22,7 +22,13 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.datastream.api.extension.join.JoinFunction; import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.function.TwoInputNonBroadcastWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.function.TwoOutputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; @@ -121,4 +127,89 @@ public class BuiltinFuncs { joinFunction, joinType); } + + // =================== Window =========================== + + static final Class<?> WINDOW_FUNCS_INSTANCE; + + static { + try { + WINDOW_FUNCS_INSTANCE = + Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinWindowFuncs"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } + + /** + * Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a + * OneInputStreamProcessFunction to perform the window operation. + * + * @param windowStrategy the window strategy + * @param windowProcessFunction the window process function + * @return the wrapped process function + */ + public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> window( + WindowStrategy windowStrategy, + OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction) { + try { + return (OneInputStreamProcessFunction<IN, OUT>) + WINDOW_FUNCS_INSTANCE + .getMethod( + "window", + WindowStrategy.class, + OneInputWindowStreamProcessFunction.class) + .invoke(null, windowStrategy, windowProcessFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the WindowStrategy and TwoInputNonBroadcastWindowStreamProcessFunction within a + * TwoInputNonBroadcastStreamProcessFunction to perform the window operation. + * + * @param windowStrategy the window strategy + * @param windowProcessFunction the window process function + * @return the wrapped process function + */ + public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> window( + WindowStrategy windowStrategy, + TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT> windowProcessFunction) { + try { + return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>) + WINDOW_FUNCS_INSTANCE + .getMethod( + "window", + WindowStrategy.class, + TwoInputNonBroadcastWindowStreamProcessFunction.class) + .invoke(null, windowStrategy, windowProcessFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the WindowStrategy and TwoOutputWindowStreamProcessFunction within a + * TwoOutputStreamProcessFunction to perform the window operation. + * + * @param windowStrategy the window strategy + * @param windowProcessFunction the window process function + * @return the wrapped process function + */ + public static <IN, OUT1, OUT2> TwoOutputStreamProcessFunction<IN, OUT1, OUT2> window( + WindowStrategy windowStrategy, + TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2> windowProcessFunction) { + try { + return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>) + WINDOW_FUNCS_INSTANCE + .getMethod( + "window", + WindowStrategy.class, + TwoOutputWindowStreamProcessFunction.class) + .invoke(null, windowStrategy, windowProcessFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java new file mode 100644 index 00000000000..8e282fd5f10 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; + +/** + * This interface extends {@link WindowContext} and provides additional functionality for writing + * and reading window data of one input window. + * + * @param <IN> Type of the input elements + */ +@Experimental +public interface OneInputWindowContext<IN> extends WindowContext { + + /** + * Write records into the window's state. + * + * @param record The record to be written into the window's state. + */ + void putRecord(IN record); + + /** + * Read records from the window's state. + * + * @return Iterable of records, which could be null if the window is empty. + */ + Iterable<IN> getAllRecords(); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java new file mode 100644 index 00000000000..3f6eea40a05 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; + +/** + * This interface extends {@link WindowContext} and provides additional functionality for writing + * and reading window data of two input window. + */ +@Experimental +public interface TwoInputWindowContext<IN1, IN2> extends WindowContext { + + /** + * Write records from input1 into the window's state. + * + * @param record The record from input1 to be written into the window's state. + */ + void putRecord1(IN1 record); + + /** + * Read input1's records from the window's state. + * + * @return Iterable of input1's records, which could be null if input1 has no data. + */ + Iterable<IN1> getAllRecords1(); + + /** + * Write records from input2 into the window's state. + * + * @param record The record from input2 to be written into the window's state. + */ + void putRecord2(IN2 record); + + /** + * Read input2's records from the window's state. + * + * @return Iterable of input2's records, which could be null if input2 has no data. + */ + Iterable<IN2> getAllRecords2(); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java new file mode 100644 index 00000000000..25627d80fa8 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.AggregatingStateDeclaration; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ReducingStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * This interface represents a context for window operations and provides methods to interact with + * state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is a session window + * or not a time window. + */ + long getEndTime(); + + /** + * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state + * that is scoped to the window. + */ + <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state + * that is scoped to the window. + */ + <KEY, V> Optional<MapState<KEY, V>> getWindowState(MapStateDeclaration<KEY, V> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant state + * that is scoped to the window. + */ + <T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link ReducingState} object that can be used to interact with fault-tolerant + * state that is scoped to the window. + */ + <T> Optional<ReducingState<T>> getWindowState(ReducingStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link AggregatingState} object that can be used to interact with fault-tolerant + * state that is scoped to the window. + */ + <T, ACC, OUT> Optional<AggregatingState<T, OUT>> getWindowState( + AggregatingStateDeclaration<T, ACC, OUT> stateDeclaration) throws Exception; +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java new file mode 100644 index 00000000000..adfa73b6ba2 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} for one-input window processing. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + */ +@Experimental +public interface OneInputWindowStreamProcessFunction<IN, OUT> extends WindowProcessFunction { + + /** + * This method will be invoked when a record is received. Its default behaviors to store data in + * built-in window state by {@link OneInputWindowContext#putRecord}. If the user overrides this + * method, they have to take care of the input data themselves. + */ + default void onRecord( + IN record, + Collector<OUT> output, + PartitionedContext<OUT> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception { + windowContext.putRecord(record); + } + + /** + * This method will be invoked when the Window is triggered, you can obtain all the input + * records in the Window by {@link OneInputWindowContext#getAllRecords()}. + */ + void onTrigger( + Collector<OUT> output, + PartitionedContext<OUT> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception; + + /** + * Callback when a window is about to be cleaned up. It is the time to deletes any state in the + * {@code windowContext} when the Window expires (the event time or processing time passes its + * {@code maxTimestamp} + {@code allowedLateness}). + */ + default void onClear( + Collector<OUT> output, + PartitionedContext<OUT> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception {} + + /** This method will be invoked when a record is received after the window has been cleaned. */ + default void onLateRecord(IN record, Collector<OUT> output, PartitionedContext<OUT> ctx) + throws Exception {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java new file mode 100644 index 00000000000..77ccfb8700b --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} for two input window processing, such as window-join. + * + * @param <IN1> The type of the input1 value. + * @param <IN2> The type of the input2 value. + * @param <OUT> The type of the output value. + */ +@Experimental +public interface TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT> + extends WindowProcessFunction { + + /** + * This method will be invoked when a record is received from input1. Its default behaviors to + * store data in built-in window state by {@link TwoInputWindowContext#putRecord1}. If the user + * overrides this method, they have to take care of the input data themselves. + */ + default void onRecord1( + IN1 record, + Collector<OUT> output, + PartitionedContext<OUT> ctx, + TwoInputWindowContext<IN1, IN2> windowContext) + throws Exception { + windowContext.putRecord1(record); + } + + /** + * This method will be invoked when a record is received from input2. Its default behaviors to + * store data in built-in window state by {@link TwoInputWindowContext#putRecord2}. If the user + * overrides this method, they have to take care of the input data themselves. + */ + default void onRecord2( + IN2 record, + Collector<OUT> output, + PartitionedContext<OUT> ctx, + TwoInputWindowContext<IN1, IN2> windowContext) + throws Exception { + windowContext.putRecord2(record); + } + + /** + * This method will be invoked when the Window is triggered, you can obtain all the input + * records in the Window by {@link TwoInputWindowContext#getAllRecords1()} and {@link + * TwoInputWindowContext#getAllRecords2()}. + */ + void onTrigger( + Collector<OUT> output, + PartitionedContext<OUT> ctx, + TwoInputWindowContext<IN1, IN2> windowContext) + throws Exception; + + /** + * Callback when a window is about to be cleaned up. It is the time to deletes any state in the + * {@code windowContext} when the Window expires (the event time or processing time passes its + * {@code maxTimestamp} + {@code allowedLateness}). + */ + default void onClear( + Collector<OUT> output, + PartitionedContext<OUT> ctx, + TwoInputWindowContext<IN1, IN2> windowContext) + throws Exception {} + + /** + * This method will be invoked when a record is received from input1 after the window has been + * cleaned. + */ + default void onLateRecord1(IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) + throws Exception {} + + /** + * This method will be invoked when a record is received from input2 after the window has been + * cleaned. + */ + default void onLateRecord2(IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) + throws Exception {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java new file mode 100644 index 00000000000..828d5ceac33 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} for two-output window processing. + * + * @param <IN> The type of the input value. + * @param <OUT1> The type of the output value to the first output. + * @param <OUT2> The type of the output value to the second output. + */ +@Experimental +public interface TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2> + extends WindowProcessFunction { + + /** + * This method will be invoked when a record is received. Its default behaviors to store data in + * built-in window state by {@link OneInputWindowContext#putRecord}. If the user overrides this + * method, they have to take care of the input data themselves. + */ + default void onRecord( + IN record, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception { + windowContext.putRecord(record); + } + + /** + * This method will be invoked when the Window is triggered, you can obtain all the input + * records in the Window by {@link OneInputWindowContext#getAllRecords()}. + */ + void onTrigger( + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception; + + /** + * Callback when a window is about to be cleaned up. It is the time to deletes any state in the + * {@code windowContext} when the Window expires (the event time or processing time passes its + * {@code maxTimestamp} + {@code allowedLateness}). + */ + default void onClear( + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx, + OneInputWindowContext<IN> windowContext) + throws Exception {} + + /** This method will be invoked when a record is received after the window has been cleaned. */ + default void onLateRecord( + IN record, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx) + throws Exception {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java new file mode 100644 index 00000000000..2e840463b84 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; + +import java.util.Collections; +import java.util.Set; + +/** + * Base interface for functions evaluated over windows, providing callback functions for various + * stages of the window's lifecycle. + */ +@Experimental +public interface WindowProcessFunction extends ProcessFunction { + + /** + * Explicitly declares states that are bound to the window. Each specific window state must be + * declared in this method before it can be used. + * + * @return all declared window states used by this process function. + */ + default Set<StateDeclaration> useWindowStates() { + return Collections.emptySet(); + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java new file mode 100644 index 00000000000..b5ebac65905 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +/** A {@link WindowStrategy} used to generate GlobalWindow. */ +@Experimental +public class GlobalWindowStrategy extends WindowStrategy {} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java new file mode 100644 index 00000000000..96217f13ae8 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate Session Windows. */ +@Experimental +public class SessionWindowStrategy extends WindowStrategy { + private final Duration sessionGap; + private final TimeType timeType; + + public SessionWindowStrategy(Duration sessionGap) { + this(sessionGap, TimeType.EVENT); + } + + public SessionWindowStrategy(Duration sessionGap, TimeType timeType) { + this.sessionGap = sessionGap; + this.timeType = timeType; + } + + public Duration getSessionGap() { + return sessionGap; + } + + public TimeType getTimeType() { + return timeType; + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java new file mode 100644 index 00000000000..673851cc107 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate sliding TimeWindow. */ +@Experimental +public class SlidingTimeWindowStrategy extends WindowStrategy { + private final Duration windowSize; + private final Duration windowSlideInterval; + private final TimeType timeType; + private final Duration allowedLateness; + + public SlidingTimeWindowStrategy(Duration windowSize, Duration windowSlideInterval) { + this(windowSize, windowSlideInterval, TimeType.EVENT); + } + + public SlidingTimeWindowStrategy( + Duration windowSize, Duration windowSlideInterval, TimeType timeType) { + this(windowSize, windowSlideInterval, timeType, Duration.ZERO); + } + + public SlidingTimeWindowStrategy( + Duration windowSize, + Duration windowSlideInterval, + TimeType timeType, + Duration allowedLateness) { + this.windowSize = windowSize; + this.windowSlideInterval = windowSlideInterval; + this.timeType = timeType; + this.allowedLateness = allowedLateness; + } + + public Duration getWindowSize() { + return windowSize; + } + + public Duration getWindowSlideInterval() { + return windowSlideInterval; + } + + public TimeType getTimeType() { + return timeType; + } + + public Duration getAllowedLateness() { + return allowedLateness; + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java new file mode 100644 index 00000000000..7cdc30afa82 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */ +@Experimental +public class TumblingTimeWindowStrategy extends WindowStrategy { + private final Duration windowSize; + private final TimeType timeType; + private final Duration allowedLateness; + + public TumblingTimeWindowStrategy(Duration windowSize) { + this(windowSize, TimeType.EVENT); + } + + public TumblingTimeWindowStrategy(Duration windowSize, TimeType timeType) { + this(windowSize, timeType, Duration.ZERO); + } + + public TumblingTimeWindowStrategy( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + this.windowSize = windowSize; + this.timeType = timeType; + this.allowedLateness = allowedLateness; + } + + public Duration getWindowSize() { + return windowSize; + } + + public TimeType getTimeType() { + return timeType; + } + + public Duration getAllowedLateness() { + return allowedLateness; + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java new file mode 100644 index 00000000000..f0a7e1267f3 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** + * This class describes what kind of Windows to use, including strategies for dividing, triggering, + * and clearing Windows. + * + * <p>We currently provide three built-in window types: Global Window, Time Window, and Session + * Window. + * + * <ul> + * <li>Global Window: All the data is in a single window. There is only one Global Window, and all + * data is assigned to this single window. Global Window are suitable for bounded stream + * scenarios and can be used in GlobalStream, KeyedStream, and NonKeyedStream. + * <li>Time Window: Data within a specific time period is assigned to a single window. Time + * Windows are divided into multiple windows based on time ranges, and data is assigned to the + * corresponding window based on its timestamp. We support two types of time windows: tumbling + * windows and sliding windows, and the tumbling windows cannot overlap. The time semantics + * within the windows can be divided into event time and processing time. Time Window can be + * used in GlobalStream and KeyedStream. + * <li>Session Window: Consecutive data is assigned to a single window. Session windows are a + * special type of time window and are divided into multiple windows based on time ranges. + * When data arrives, it is first assigned to the corresponding window based on its timestamp, + * and then existing windows are merged as much as possible. Session Window can be used in + * GlobalStream and KeyedStream. + * </ul> + */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can be used in KeyedStream and GlobalStream. If tumbling time window is + * used in a GlobalStream, it will convert the GlobalStream into a KeyedStream with a Key of + * zero, and then use the converted KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can be used in + * KeyedStream and GlobalStream. If tumbling time window is used in a GlobalStream, it will + * convert the GlobalStream into a KeyedStream with a Key of zero, and then use the converted + * KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can be used in + * KeyedStream and GlobalStream. If tumbling time window is used in a GlobalStream, it will + * convert the GlobalStream into a KeyedStream with a Key of zero, and then use the converted + * KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can be used in KeyedStream and GlobalStream. If sliding time window is + * used in a GlobalStream, it will convert the GlobalStream into a KeyedStream with a Key of + * zero, and then use the converted KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding(Duration windowSize, Duration windowSlideInterval) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can be used in + * KeyedStream and GlobalStream. If sliding time window is used in a GlobalStream, it will + * convert the GlobalStream into a KeyedStream with a Key of zero, and then use the converted + * KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, Duration windowSlideInterval, TimeType timeType) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, timeType); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can be used in + * KeyedStream and GlobalStream. If sliding time window is used in a GlobalStream, it will + * convert the GlobalStream into a KeyedStream with a Key of zero, and then use the converted + * KeyedStream to execute the window. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, + Duration windowSlideInterval, + TimeType timeType, + Duration allowedLateness) { + return new SlidingTimeWindowStrategy( + windowSize, windowSlideInterval, timeType, allowedLateness); + } + + // ============== session window ================ + + /** + * Create a session time window strategy with the event time default time type. Note that + * session time windows can be used in KeyedStream and GlobalStream. If session time window is + * used in a GlobalStream, it will convert the GlobalStream into a KeyedStream with a Key of + * zero, and then use the converted KeyedStream to execute the window. + * + * @param sessionGap the timeout of session. + * @return A session window strategy. + */ + public static WindowStrategy session(Duration sessionGap) { + return new SessionWindowStrategy(sessionGap); + } + + /** + * Create a session time window strategy. Note that session time windows can be used in + * KeyedStream and GlobalStream. If session time window is used in a GlobalStream, it will + * convert the GlobalStream into a KeyedStream with a Key of zero, and then use the converted + * KeyedStream to execute the window. + * + * @param sessionGap the timeout of session. + * @param timeType the time type of Window. + * @return A session window strategy. + */ + public static WindowStrategy session(Duration sessionGap, TimeType timeType) { + return new SessionWindowStrategy(sessionGap, timeType); + } +}