This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6de1fa28e2fe89678d441e79417f1533857b26b1
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Sat Jan 18 02:31:12 2025 +0800

    [FLINK-37136][API] Introduce Window extension components for DataStream V2
---
 .../flink/datastream/api/builtin/BuiltinFuncs.java |  91 ++++++++++
 .../window/context/OneInputWindowContext.java      |  45 +++++
 .../window/context/TwoInputWindowContext.java      |  57 ++++++
 .../extension/window/context/WindowContext.java    |  94 ++++++++++
 .../OneInputWindowStreamProcessFunction.java       |  73 ++++++++
 ...putNonBroadcastWindowStreamProcessFunction.java | 100 ++++++++++
 .../TwoOutputWindowStreamProcessFunction.java      |  82 +++++++++
 .../window/function/WindowProcessFunction.java     |  44 +++++
 .../window/strategy/GlobalWindowStrategy.java      |  25 +++
 .../window/strategy/SessionWindowStrategy.java     |  47 +++++
 .../window/strategy/SlidingTimeWindowStrategy.java |  68 +++++++
 .../strategy/TumblingTimeWindowStrategy.java       |  58 ++++++
 .../extension/window/strategy/WindowStrategy.java  | 201 +++++++++++++++++++++
 13 files changed, 985 insertions(+)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
index 76b8032cf3e..053fff66071 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
@@ -22,7 +22,13 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.datastream.api.extension.join.JoinFunction;
 import org.apache.flink.datastream.api.extension.join.JoinType;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.function.TwoInputNonBroadcastWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.function.TwoOutputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 
@@ -121,4 +127,89 @@ public class BuiltinFuncs {
                 joinFunction,
                 joinType);
     }
+
+    // =================== Window ===========================
+
+    static final Class<?> WINDOW_FUNCS_INSTANCE;
+
+    static {
+        try {
+            WINDOW_FUNCS_INSTANCE =
+                    
Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinWindowFuncs");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Please ensure that flink-datastream in 
your class path");
+        }
+    }
+
+    /**
+     * Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a
+     * OneInputStreamProcessFunction to perform the window operation.
+     *
+     * @param windowStrategy the window strategy
+     * @param windowProcessFunction the window process function
+     * @return the wrapped process function
+     */
+    public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> window(
+            WindowStrategy windowStrategy,
+            OneInputWindowStreamProcessFunction<IN, OUT> 
windowProcessFunction) {
+        try {
+            return (OneInputStreamProcessFunction<IN, OUT>)
+                    WINDOW_FUNCS_INSTANCE
+                            .getMethod(
+                                    "window",
+                                    WindowStrategy.class,
+                                    OneInputWindowStreamProcessFunction.class)
+                            .invoke(null, windowStrategy, 
windowProcessFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the WindowStrategy and 
TwoInputNonBroadcastWindowStreamProcessFunction within a
+     * TwoInputNonBroadcastStreamProcessFunction to perform the window 
operation.
+     *
+     * @param windowStrategy the window strategy
+     * @param windowProcessFunction the window process function
+     * @return the wrapped process function
+     */
+    public static <IN1, IN2, OUT> 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> window(
+            WindowStrategy windowStrategy,
+            TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT> 
windowProcessFunction) {
+        try {
+            return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>)
+                    WINDOW_FUNCS_INSTANCE
+                            .getMethod(
+                                    "window",
+                                    WindowStrategy.class,
+                                    
TwoInputNonBroadcastWindowStreamProcessFunction.class)
+                            .invoke(null, windowStrategy, 
windowProcessFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the WindowStrategy and TwoOutputWindowStreamProcessFunction within 
a
+     * TwoOutputStreamProcessFunction to perform the window operation.
+     *
+     * @param windowStrategy the window strategy
+     * @param windowProcessFunction the window process function
+     * @return the wrapped process function
+     */
+    public static <IN, OUT1, OUT2> TwoOutputStreamProcessFunction<IN, OUT1, 
OUT2> window(
+            WindowStrategy windowStrategy,
+            TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2> 
windowProcessFunction) {
+        try {
+            return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>)
+                    WINDOW_FUNCS_INSTANCE
+                            .getMethod(
+                                    "window",
+                                    WindowStrategy.class,
+                                    TwoOutputWindowStreamProcessFunction.class)
+                            .invoke(null, windowStrategy, 
windowProcessFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java
new file mode 100644
index 00000000000..8e282fd5f10
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * This interface extends {@link WindowContext} and provides additional 
functionality for writing
+ * and reading window data of one input window.
+ *
+ * @param <IN> Type of the input elements
+ */
+@Experimental
+public interface OneInputWindowContext<IN> extends WindowContext {
+
+    /**
+     * Write records into the window's state.
+     *
+     * @param record The record to be written into the window's state.
+     */
+    void putRecord(IN record);
+
+    /**
+     * Read records from the window's state.
+     *
+     * @return Iterable of records, which could be null if the window is empty.
+     */
+    Iterable<IN> getAllRecords();
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java
new file mode 100644
index 00000000000..3f6eea40a05
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/TwoInputWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * This interface extends {@link WindowContext} and provides additional 
functionality for writing
+ * and reading window data of two input window.
+ */
+@Experimental
+public interface TwoInputWindowContext<IN1, IN2> extends WindowContext {
+
+    /**
+     * Write records from input1 into the window's state.
+     *
+     * @param record The record from input1 to be written into the window's 
state.
+     */
+    void putRecord1(IN1 record);
+
+    /**
+     * Read input1's records from the window's state.
+     *
+     * @return Iterable of input1's records, which could be null if input1 has 
no data.
+     */
+    Iterable<IN1> getAllRecords1();
+
+    /**
+     * Write records from input2 into the window's state.
+     *
+     * @param record The record from input2 to be written into the window's 
state.
+     */
+    void putRecord2(IN2 record);
+
+    /**
+     * Read input2's records from the window's state.
+     *
+     * @return Iterable of input2's records, which could be null if input2 has 
no data.
+     */
+    Iterable<IN2> getAllRecords2();
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java
new file mode 100644
index 00000000000..25627d80fa8
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.AggregatingStateDeclaration;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ReducingStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.AggregatingState;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ReducingState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * This interface represents a context for window operations and provides 
methods to interact with
+ * state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is a session window
+     *     or not a time window.
+     */
+    long getEndTime();
+
+    /**
+     * Retrieves a {@link ListState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window.
+     */
+    <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link MapState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window.
+     */
+    <KEY, V> Optional<MapState<KEY, V>> 
getWindowState(MapStateDeclaration<KEY, V> stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link ValueState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window.
+     */
+    <T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link ReducingState} object that can be used to interact 
with fault-tolerant
+     * state that is scoped to the window.
+     */
+    <T> Optional<ReducingState<T>> getWindowState(ReducingStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link AggregatingState} object that can be used to 
interact with fault-tolerant
+     * state that is scoped to the window.
+     */
+    <T, ACC, OUT> Optional<AggregatingState<T, OUT>> getWindowState(
+            AggregatingStateDeclaration<T, ACC, OUT> stateDeclaration) throws 
Exception;
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java
new file mode 100644
index 00000000000..adfa73b6ba2
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} for one-input window processing.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+@Experimental
+public interface OneInputWindowStreamProcessFunction<IN, OUT> extends 
WindowProcessFunction {
+
+    /**
+     * This method will be invoked when a record is received. Its default 
behaviors to store data in
+     * built-in window state by {@link OneInputWindowContext#putRecord}. If 
the user overrides this
+     * method, they have to take care of the input data themselves.
+     */
+    default void onRecord(
+            IN record,
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception {
+        windowContext.putRecord(record);
+    }
+
+    /**
+     * This method will be invoked when the Window is triggered, you can 
obtain all the input
+     * records in the Window by {@link OneInputWindowContext#getAllRecords()}.
+     */
+    void onTrigger(
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception;
+
+    /**
+     * Callback when a window is about to be cleaned up. It is the time to 
deletes any state in the
+     * {@code windowContext} when the Window expires (the event time or 
processing time passes its
+     * {@code maxTimestamp} + {@code allowedLateness}).
+     */
+    default void onClear(
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception {}
+
+    /** This method will be invoked when a record is received after the window 
has been cleaned. */
+    default void onLateRecord(IN record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java
new file mode 100644
index 00000000000..77ccfb8700b
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonBroadcastWindowStreamProcessFunction.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} for two input window processing, 
such as window-join.
+ *
+ * @param <IN1> The type of the input1 value.
+ * @param <IN2> The type of the input2 value.
+ * @param <OUT> The type of the output value.
+ */
+@Experimental
+public interface TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT>
+        extends WindowProcessFunction {
+
+    /**
+     * This method will be invoked when a record is received from input1. Its 
default behaviors to
+     * store data in built-in window state by {@link 
TwoInputWindowContext#putRecord1}. If the user
+     * overrides this method, they have to take care of the input data 
themselves.
+     */
+    default void onRecord1(
+            IN1 record,
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            TwoInputWindowContext<IN1, IN2> windowContext)
+            throws Exception {
+        windowContext.putRecord1(record);
+    }
+
+    /**
+     * This method will be invoked when a record is received from input2. Its 
default behaviors to
+     * store data in built-in window state by {@link 
TwoInputWindowContext#putRecord2}. If the user
+     * overrides this method, they have to take care of the input data 
themselves.
+     */
+    default void onRecord2(
+            IN2 record,
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            TwoInputWindowContext<IN1, IN2> windowContext)
+            throws Exception {
+        windowContext.putRecord2(record);
+    }
+
+    /**
+     * This method will be invoked when the Window is triggered, you can 
obtain all the input
+     * records in the Window by {@link TwoInputWindowContext#getAllRecords1()} 
and {@link
+     * TwoInputWindowContext#getAllRecords2()}.
+     */
+    void onTrigger(
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            TwoInputWindowContext<IN1, IN2> windowContext)
+            throws Exception;
+
+    /**
+     * Callback when a window is about to be cleaned up. It is the time to 
deletes any state in the
+     * {@code windowContext} when the Window expires (the event time or 
processing time passes its
+     * {@code maxTimestamp} + {@code allowedLateness}).
+     */
+    default void onClear(
+            Collector<OUT> output,
+            PartitionedContext<OUT> ctx,
+            TwoInputWindowContext<IN1, IN2> windowContext)
+            throws Exception {}
+
+    /**
+     * This method will be invoked when a record is received from input1 after 
the window has been
+     * cleaned.
+     */
+    default void onLateRecord1(IN1 record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception {}
+
+    /**
+     * This method will be invoked when a record is received from input2 after 
the window has been
+     * cleaned.
+     */
+    default void onLateRecord2(IN2 record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java
new file mode 100644
index 00000000000..828d5ceac33
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} for two-output window processing.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT1> The type of the output value to the first output.
+ * @param <OUT2> The type of the output value to the second output.
+ */
+@Experimental
+public interface TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2>
+        extends WindowProcessFunction {
+
+    /**
+     * This method will be invoked when a record is received. Its default 
behaviors to store data in
+     * built-in window state by {@link OneInputWindowContext#putRecord}. If 
the user overrides this
+     * method, they have to take care of the input data themselves.
+     */
+    default void onRecord(
+            IN record,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception {
+        windowContext.putRecord(record);
+    }
+
+    /**
+     * This method will be invoked when the Window is triggered, you can 
obtain all the input
+     * records in the Window by {@link OneInputWindowContext#getAllRecords()}.
+     */
+    void onTrigger(
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception;
+
+    /**
+     * Callback when a window is about to be cleaned up. It is the time to 
deletes any state in the
+     * {@code windowContext} when the Window expires (the event time or 
processing time passes its
+     * {@code maxTimestamp} + {@code allowedLateness}).
+     */
+    default void onClear(
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx,
+            OneInputWindowContext<IN> windowContext)
+            throws Exception {}
+
+    /** This method will be invoked when a record is received after the window 
has been cleaned. */
+    default void onLateRecord(
+            IN record,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx)
+            throws Exception {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java
new file mode 100644
index 00000000000..2e840463b84
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base interface for functions evaluated over windows, providing callback 
functions for various
+ * stages of the window's lifecycle.
+ */
+@Experimental
+public interface WindowProcessFunction extends ProcessFunction {
+
+    /**
+     * Explicitly declares states that are bound to the window. Each specific 
window state must be
+     * declared in this method before it can be used.
+     *
+     * @return all declared window states used by this process function.
+     */
+    default Set<StateDeclaration> useWindowStates() {
+        return Collections.emptySet();
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java
new file mode 100644
index 00000000000..b5ebac65905
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/GlobalWindowStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+/** A {@link WindowStrategy} used to generate GlobalWindow. */
+@Experimental
+public class GlobalWindowStrategy extends WindowStrategy {}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java
new file mode 100644
index 00000000000..96217f13ae8
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate Session Windows. */
+@Experimental
+public class SessionWindowStrategy extends WindowStrategy {
+    private final Duration sessionGap;
+    private final TimeType timeType;
+
+    public SessionWindowStrategy(Duration sessionGap) {
+        this(sessionGap, TimeType.EVENT);
+    }
+
+    public SessionWindowStrategy(Duration sessionGap, TimeType timeType) {
+        this.sessionGap = sessionGap;
+        this.timeType = timeType;
+    }
+
+    public Duration getSessionGap() {
+        return sessionGap;
+    }
+
+    public TimeType getTimeType() {
+        return timeType;
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java
new file mode 100644
index 00000000000..673851cc107
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate sliding TimeWindow. */
+@Experimental
+public class SlidingTimeWindowStrategy extends WindowStrategy {
+    private final Duration windowSize;
+    private final Duration windowSlideInterval;
+    private final TimeType timeType;
+    private final Duration allowedLateness;
+
+    public SlidingTimeWindowStrategy(Duration windowSize, Duration 
windowSlideInterval) {
+        this(windowSize, windowSlideInterval, TimeType.EVENT);
+    }
+
+    public SlidingTimeWindowStrategy(
+            Duration windowSize, Duration windowSlideInterval, TimeType 
timeType) {
+        this(windowSize, windowSlideInterval, timeType, Duration.ZERO);
+    }
+
+    public SlidingTimeWindowStrategy(
+            Duration windowSize,
+            Duration windowSlideInterval,
+            TimeType timeType,
+            Duration allowedLateness) {
+        this.windowSize = windowSize;
+        this.windowSlideInterval = windowSlideInterval;
+        this.timeType = timeType;
+        this.allowedLateness = allowedLateness;
+    }
+
+    public Duration getWindowSize() {
+        return windowSize;
+    }
+
+    public Duration getWindowSlideInterval() {
+        return windowSlideInterval;
+    }
+
+    public TimeType getTimeType() {
+        return timeType;
+    }
+
+    public Duration getAllowedLateness() {
+        return allowedLateness;
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java
new file mode 100644
index 00000000000..7cdc30afa82
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */
+@Experimental
+public class TumblingTimeWindowStrategy extends WindowStrategy {
+    private final Duration windowSize;
+    private final TimeType timeType;
+    private final Duration allowedLateness;
+
+    public TumblingTimeWindowStrategy(Duration windowSize) {
+        this(windowSize, TimeType.EVENT);
+    }
+
+    public TumblingTimeWindowStrategy(Duration windowSize, TimeType timeType) {
+        this(windowSize, timeType, Duration.ZERO);
+    }
+
+    public TumblingTimeWindowStrategy(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        this.windowSize = windowSize;
+        this.timeType = timeType;
+        this.allowedLateness = allowedLateness;
+    }
+
+    public Duration getWindowSize() {
+        return windowSize;
+    }
+
+    public TimeType getTimeType() {
+        return timeType;
+    }
+
+    public Duration getAllowedLateness() {
+        return allowedLateness;
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java
new file mode 100644
index 00000000000..f0a7e1267f3
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * This class describes what kind of Windows to use, including strategies for 
dividing, triggering,
+ * and clearing Windows.
+ *
+ * <p>We currently provide three built-in window types: Global Window, Time 
Window, and Session
+ * Window.
+ *
+ * <ul>
+ *   <li>Global Window: All the data is in a single window. There is only one 
Global Window, and all
+ *       data is assigned to this single window. Global Window are suitable 
for bounded stream
+ *       scenarios and can be used in GlobalStream, KeyedStream, and 
NonKeyedStream.
+ *   <li>Time Window: Data within a specific time period is assigned to a 
single window. Time
+ *       Windows are divided into multiple windows based on time ranges, and 
data is assigned to the
+ *       corresponding window based on its timestamp. We support two types of 
time windows: tumbling
+ *       windows and sliding windows, and the tumbling windows cannot overlap. 
The time semantics
+ *       within the windows can be divided into event time and processing 
time. Time Window can be
+ *       used in GlobalStream and KeyedStream.
+ *   <li>Session Window: Consecutive data is assigned to a single window. 
Session windows are a
+ *       special type of time window and are divided into multiple windows 
based on time ranges.
+ *       When data arrives, it is first assigned to the corresponding window 
based on its timestamp,
+ *       and then existing windows are merged as much as possible. Session 
Window can be used in
+ *       GlobalStream and KeyedStream.
+ * </ul>
+ */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can be used in KeyedStream and GlobalStream. If 
tumbling time window is
+     * used in a GlobalStream, it will convert the GlobalStream into a 
KeyedStream with a Key of
+     * zero, and then use the converted KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can be used in
+     * KeyedStream and GlobalStream. If tumbling time window is used in a 
GlobalStream, it will
+     * convert the GlobalStream into a KeyedStream with a Key of zero, and 
then use the converted
+     * KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can be used in
+     * KeyedStream and GlobalStream. If tumbling time window is used in a 
GlobalStream, it will
+     * convert the GlobalStream into a KeyedStream with a Key of zero, and 
then use the converted
+     * KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can be used in KeyedStream and GlobalStream. If 
sliding time window is
+     * used in a GlobalStream, it will convert the GlobalStream into a 
KeyedStream with a Key of
+     * zero, and then use the converted KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(Duration windowSize, Duration 
windowSlideInterval) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can be used in
+     * KeyedStream and GlobalStream. If sliding time window is used in a 
GlobalStream, it will
+     * convert the GlobalStream into a KeyedStream with a Key of zero, and 
then use the converted
+     * KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize, Duration windowSlideInterval, TimeType 
timeType) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, 
timeType);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can be used in
+     * KeyedStream and GlobalStream. If sliding time window is used in a 
GlobalStream, it will
+     * convert the GlobalStream into a KeyedStream with a Key of zero, and 
then use the converted
+     * KeyedStream to execute the window.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize,
+            Duration windowSlideInterval,
+            TimeType timeType,
+            Duration allowedLateness) {
+        return new SlidingTimeWindowStrategy(
+                windowSize, windowSlideInterval, timeType, allowedLateness);
+    }
+
+    // ============== session window ================
+
+    /**
+     * Create a session time window strategy with the event time default time 
type. Note that
+     * session time windows can be used in KeyedStream and GlobalStream. If 
session time window is
+     * used in a GlobalStream, it will convert the GlobalStream into a 
KeyedStream with a Key of
+     * zero, and then use the converted KeyedStream to execute the window.
+     *
+     * @param sessionGap the timeout of session.
+     * @return A session window strategy.
+     */
+    public static WindowStrategy session(Duration sessionGap) {
+        return new SessionWindowStrategy(sessionGap);
+    }
+
+    /**
+     * Create a session time window strategy. Note that session time windows 
can be used in
+     * KeyedStream and GlobalStream. If session time window is used in a 
GlobalStream, it will
+     * convert the GlobalStream into a KeyedStream with a Key of zero, and 
then use the converted
+     * KeyedStream to execute the window.
+     *
+     * @param sessionGap the timeout of session.
+     * @param timeType the time type of Window.
+     * @return A session window strategy.
+     */
+    public static WindowStrategy session(Duration sessionGap, TimeType 
timeType) {
+        return new SessionWindowStrategy(sessionGap, timeType);
+    }
+}


Reply via email to