This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1871fc31aa64e51fb812e6de457bad9e1c020c3
Author: xuyang <[email protected]>
AuthorDate: Mon Jan 22 11:31:56 2024 +0800

    [FLINK-34100][table] Support session window table function without pulling 
up with window agg
    
    This closes #24162
---
 .../exec/batch/BatchExecWindowTableFunction.java   |  10 ++
 .../exec/common/CommonExecWindowTableFunction.java |  46 +++++---
 .../exec/stream/StreamExecWindowAggregateBase.java |  20 ++--
 .../exec/stream/StreamExecWindowTableFunction.java |  74 ++++++++++++
 .../plan/utils/WindowTableFunctionUtil.java        |  10 ++
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 130 +++++++++++++++++++++
 .../plan/batch/sql/WindowTableFunctionTest.scala   |  27 +++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  10 +-
 .../runtime/stream/sql/WindowAggregateITCase.scala |  37 ++++++
 .../stream/sql/WindowTableFunctionITCase.scala     | 104 ++++++++++++++++-
 .../aggregate/window/WindowAggOperatorBuilder.java |  25 ++--
 .../window/tvf/unslicing/UnsliceAssigners.java     | 129 ++++++++++++++++++++
 12 files changed, 573 insertions(+), 49 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java
index b00ce302c19..2194474c273 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java
@@ -61,4 +61,14 @@ public class BatchExecWindowTableFunction extends 
CommonExecWindowTableFunction
         }
         return super.translateToPlanInternal(planner, config);
     }
+
+    @Override
+    protected Transformation<RowData> translateWithUnalignedWindow(
+            PlannerBase planner,
+            ExecNodeConfig config,
+            RowType inputRowType,
+            Transformation<RowData> inputTransform) {
+        throw new TableException(
+                "Unaligned windows like session are not supported in batch 
mode yet.");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
index 404f24c3cb9..df205561781 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
@@ -20,10 +20,8 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -38,6 +36,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.tvf.operator.AlignedWindowTableFunctionOperator;
+import 
org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
@@ -79,22 +78,22 @@ public abstract class CommonExecWindowTableFunction extends 
ExecNodeBase<RowData
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
-        if (windowingStrategy.getWindow() instanceof SessionWindowSpec) {
-            // TODO Support session window table function in 
ExecWindowTableFunction. See
-            //  more at FLINK-34100
-            throw new TableException("Session Window TableFunction is not 
supported yet.");
-        }
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
-        GroupWindowAssigner<TimeWindow> windowAssigner = 
createWindowAssigner(windowingStrategy);
-        final ZoneId shiftTimeZone =
-                TimeWindowUtil.getShiftTimeZone(
-                        windowingStrategy.getTimeAttributeType(),
-                        TableConfigUtils.getLocalTimeZone(config));
-        AlignedWindowTableFunctionOperator windowTableFunctionOperator =
-                new AlignedWindowTableFunctionOperator(
-                        windowAssigner, 
windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
+        final boolean isAlignedWindow = 
windowingStrategy.getWindow().isAlignedWindow();
+        if (isAlignedWindow) {
+            return translateWithAlignedWindow(config, inputTransform);
+        } else {
+            return translateWithUnalignedWindow(
+                    planner, config, (RowType) inputEdge.getOutputType(), 
inputTransform);
+        }
+    }
+
+    private Transformation<RowData> translateWithAlignedWindow(
+            ExecNodeConfig config, Transformation<RowData> inputTransform) {
+        final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+                createAlignedWindowTableFunctionOperator(config);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
                 createTransformationMeta(WINDOW_TRANSFORMATION, config),
@@ -103,4 +102,21 @@ public abstract class CommonExecWindowTableFunction 
extends ExecNodeBase<RowData
                 inputTransform.getParallelism(),
                 false);
     }
+
+    private WindowTableFunctionOperatorBase 
createAlignedWindowTableFunctionOperator(
+            ExecNodeConfig config) {
+        GroupWindowAssigner<TimeWindow> windowAssigner = 
createWindowAssigner(windowingStrategy);
+        final ZoneId shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(
+                        windowingStrategy.getTimeAttributeType(),
+                        TableConfigUtils.getLocalTimeZone(config));
+        return new AlignedWindowTableFunctionOperator(
+                windowAssigner, windowingStrategy.getTimeAttributeIndex(), 
shiftTimeZone);
+    }
+
+    protected abstract Transformation<RowData> translateWithUnalignedWindow(
+            PlannerBase planner,
+            ExecNodeConfig config,
+            RowType inputRowType,
+            Transformation<RowData> inputTransform);
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java
index c3b515dc743..690d1aac035 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java
@@ -75,16 +75,20 @@ public abstract class StreamExecWindowAggregateBase extends 
StreamExecAggregateB
             WindowingStrategy windowingStrategy, ZoneId shiftTimeZone) {
         WindowSpec windowSpec = windowingStrategy.getWindow();
         if (windowingStrategy instanceof WindowAttachedWindowingStrategy) {
-            checkArgument(
-                    isAlignedWindow(windowSpec),
-                    "UnsliceAssigner with WindowAttachedWindowingStrategy is 
not supported yet.");
-
+            int windowStartIndex =
+                    ((WindowAttachedWindowingStrategy) 
windowingStrategy).getWindowStart();
             int windowEndIndex =
                     ((WindowAttachedWindowingStrategy) 
windowingStrategy).getWindowEnd();
-            // we don't need time attribute to assign windows, use a magic 
value in this case
-            SliceAssigner innerAssigner =
-                    createSliceAssigner(windowSpec, Integer.MAX_VALUE, 
shiftTimeZone);
-            return SliceAssigners.windowed(windowEndIndex, innerAssigner);
+            if (isAlignedWindow(windowSpec)) {
+                // we don't need time attribute to assign windows, use a magic 
value in this case
+                SliceAssigner innerAssigner =
+                        createSliceAssigner(windowSpec, Integer.MAX_VALUE, 
shiftTimeZone);
+                return SliceAssigners.windowed(windowEndIndex, innerAssigner);
+            } else {
+                UnsliceAssigner<TimeWindow> innerAssigner =
+                        createUnsliceAssigner(windowSpec, windowEndIndex, 
shiftTimeZone);
+                return UnsliceAssigners.windowed(windowStartIndex, 
windowEndIndex, innerAssigner);
+            }
 
         } else if (windowingStrategy instanceof 
SliceAttachedWindowingStrategy) {
             checkArgument(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
index 7728712a6af..28fd8c128ee 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
@@ -19,22 +19,44 @@
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.WindowSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecWindowTableFunction;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.tvf.operator.UnalignedWindowTableFunctionOperator;
+import 
org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.time.ZoneId;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.table.planner.plan.utils.WindowTableFunctionUtil.createWindowAssigner;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Stream {@link ExecNode} which acts as a table-valued function to assign a 
window for each row of
  * the input relation. The return value of the new relation includes all the 
original columns as
@@ -86,4 +108,56 @@ public class StreamExecWindowTableFunction extends 
CommonExecWindowTableFunction
                 outputType,
                 description);
     }
+
+    protected Transformation<RowData> translateWithUnalignedWindow(
+            PlannerBase planner,
+            ExecNodeConfig config,
+            RowType inputRowType,
+            Transformation<RowData> inputTransform) {
+        final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+                createUnalignedWindowTableFunctionOperator(config, 
inputRowType);
+        final OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        inputTransform,
+                        createTransformationMeta(WINDOW_TRANSFORMATION, 
config),
+                        windowTableFunctionOperator,
+                        InternalTypeInfo.of(getOutputType()),
+                        inputTransform.getParallelism(),
+                        false);
+
+        final int[] partitionKeys = 
extractPartitionKeys(windowingStrategy.getWindow());
+        // set KeyType and Selector for state
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(
+                        planner.getFlinkContext().getClassLoader(),
+                        partitionKeys,
+                        InternalTypeInfo.of(inputRowType));
+        transform.setStateKeySelector(selector);
+        transform.setStateKeyType(selector.getProducedType());
+        return transform;
+    }
+
+    private int[] extractPartitionKeys(WindowSpec window) {
+        checkState(
+                window instanceof SessionWindowSpec,
+                "Only support unaligned window with session window now.");
+
+        return ((SessionWindowSpec) window).getPartitionKeyIndices();
+    }
+
+    private WindowTableFunctionOperatorBase 
createUnalignedWindowTableFunctionOperator(
+            ExecNodeConfig config, RowType inputRowType) {
+        GroupWindowAssigner<TimeWindow> windowAssigner = 
createWindowAssigner(windowingStrategy);
+        final ZoneId shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(
+                        windowingStrategy.getTimeAttributeType(),
+                        TableConfigUtils.getLocalTimeZone(config));
+
+        return new UnalignedWindowTableFunctionOperator(
+                windowAssigner,
+                windowAssigner.getWindowSerializer(new ExecutionConfig()),
+                new RowDataSerializer(inputRowType),
+                windowingStrategy.getTimeAttributeIndex(),
+                shiftTimeZone);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java
index 4f9b66b233c..60e73c28bb0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java
@@ -22,12 +22,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
 import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
+import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
 import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
 import org.apache.flink.table.planner.plan.logical.WindowSpec;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.CumulativeWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner;
 
@@ -80,6 +82,14 @@ public final class WindowTableFunctionUtil {
                 windowAssigner = 
windowAssigner.withOffset(cumulativeWindowSpec.getOffset());
             }
             return windowAssigner;
+        } else if (windowSpec instanceof SessionWindowSpec) {
+            SessionWindowSpec sessionWindowSpec = (SessionWindowSpec) 
windowSpec;
+            SessionWindowAssigner windowAssigner =
+                    SessionWindowAssigner.withGap(sessionWindowSpec.getGap());
+            if (isProctime) {
+                windowAssigner = windowAssigner.withProcessingTime();
+            }
+            return windowAssigner;
         } else {
             throw new TableException(
                     String.format(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index b5d7fa725cf..4e5c14dd9b1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1186,6 +1186,136 @@ Calc(select=[window_time, start_time, end_time])
          +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS 
rowtime_0])
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
                +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGroupKeyLessThanPartitionKeyInSessionWindow[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[$5], wAvg=[$6], uv=[$7])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT()], EXPR$3=[SUM($3)], 
EXPR$4=[MAX($3) FILTER $4], wAvg=[weightedAvg($0, $5)], uv=[COUNT(DISTINCT $6)])
+   +- LogicalProject(b=[$1], window_start=[$7], window_end=[$8], d=[$3], 
$f4=[IS TRUE(>($1, 1000))], e=[$4], c=[$2])
+      +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), 
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, 
window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER 
$f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
+   +- Exchange(distribution=[hash[b, window_start, window_end]])
+      +- Calc(select=[b, window_start, window_end, d, IS TRUE(>(b, 1000)) AS 
$f4, e, c])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[b, a])])
+            +- Exchange(distribution=[hash[b, a]])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+
+== Optimized Execution Plan ==
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, 
window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER 
$f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
+   +- Exchange(distribution=[hash[b, window_start, window_end]])
+      +- Calc(select=[b, window_start, window_end, d, (b > 1000) IS TRUE AS 
$f4, e, c])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[b, a])])
+            +- Exchange(distribution=[hash[b, a]])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGroupKeyLessThanPartitionKeyInSessionWindow[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[$5], wAvg=[$6], uv=[$7])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT()], EXPR$3=[SUM($3)], 
EXPR$4=[MAX($3) FILTER $4], wAvg=[weightedAvg($0, $5)], uv=[COUNT(DISTINCT $6)])
+   +- LogicalProject(b=[$1], window_start=[$7], window_end=[$8], d=[$3], 
$f4=[IS TRUE(>($1, 1000))], e=[$4], c=[$2])
+      +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), 
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, 
window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER 
$f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
+   +- Exchange(distribution=[hash[b, window_start, window_end]])
+      +- Calc(select=[b, window_start, window_end, d, IS TRUE(>(b, 1000)) AS 
$f4, e, c])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[b, a])])
+            +- Exchange(distribution=[hash[b, a]])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+
+== Optimized Execution Plan ==
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, 
window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER 
$f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
+   +- Exchange(distribution=[hash[b, window_start, window_end]])
+      +- Calc(select=[b, window_start, window_end, d, (b > 1000) IS TRUE AS 
$f4, e, c])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[b, a])])
+            +- Exchange(distribution=[hash[b, a]])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGroupKeyMoreThanPartitionKeyInSessionWindow[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+
+== Optimized Physical Plan ==
+GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, 
window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, 
weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
++- Exchange(distribution=[hash[a, window_start, window_end]])
+   +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, 
b, e, c])
+      +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])])
+         +- Exchange(distribution=[single])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+
+== Optimized Execution Plan ==
+GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, 
window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, 
weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
++- Exchange(distribution=[hash[a, window_start, window_end]])
+   +- Calc(select=[a, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, 
b, e, c])
+      +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])])
+         +- Exchange(distribution=[single])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGroupKeyMoreThanPartitionKeyInSessionWindow[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+
+== Optimized Physical Plan ==
+GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, 
window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, 
weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
++- Exchange(distribution=[hash[a, window_start, window_end]])
+   +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, 
b, e, c])
+      +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])])
+         +- Exchange(distribution=[single])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+
+== Optimized Execution Plan ==
+GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, 
window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, 
weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv])
++- Exchange(distribution=[hash[a, window_start, window_end]])
+   +- Calc(select=[a, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, 
b, e, c])
+      +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])])
+         +- Exchange(distribution=[single])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala
index b8c67d9f63d..6fbc1d4314e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala
@@ -206,4 +206,31 @@ class WindowTableFunctionTest extends TableTestBase {
         |""".stripMargin
     util.verifyExecPlan(sql)
   }
+
+  @Test
+  def testSessionTVF(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        | SESSION(TABLE MyTable1, DESCRIPTOR(ts), INTERVAL '10' MINUTE))
+        |""".stripMargin
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessageContaining("Unaligned windows like session are not supported 
in batch mode yet.")
+      .isInstanceOf[TableException]
+  }
+
+  @Test
+  def testSessionTVFProctime(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        | SESSION(TABLE MyTable2, DESCRIPTOR(c), INTERVAL '10' MINUTE))
+        |""".stripMargin
+
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessageContaining("Processing time Window TableFunction is not 
supported yet.")
+      .isInstanceOf[TableException]
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index a7b44993a32..d27950c480f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -1541,15 +1541,11 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
         |GROUP BY a, window_start, window_end
       """.stripMargin
 
-    assertThatThrownBy(() => util.verifyExplain(sql))
-      .hasMessageContaining("Session Window TableFunction is not supported 
yet.")
-      .isInstanceOf[TableException]
+    util.verifyExplain(sql)
   }
 
   @TestTemplate
   def testGroupKeyLessThanPartitionKeyInSessionWindow(): Unit = {
-    // TODO Support session window table function in ExecWindowTableFunction. 
See
-    //  more at FLINK-34100
     val sql = {
       """
         |SELECT
@@ -1566,9 +1562,7 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
       """.stripMargin
     }
 
-    assertThatThrownBy(() => util.verifyExplain(sql))
-      .hasMessageContaining("Session Window TableFunction is not supported 
yet.")
-      .isInstanceOf[TableException]
+    util.verifyExplain(sql)
   }
 
   @TestTemplate
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index f9c30366dea..118b9c5a22a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -1217,6 +1217,43 @@ class WindowAggregateITCase(
       .isEqualTo(expected.sorted.mkString("\n"))
   }
 
+  @TestTemplate
+  def testEventTimeSessionWindowWithTVFNotPullUpIntoWindowAgg(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  `name`,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*),
+        |  SUM(`bigdec`),
+        |  MAX(`double`),
+        |  MIN(`float`),
+        |  COUNT(DISTINCT `string`),
+        |  concat_distinct_agg(`string`)
+        |FROM (
+        | SELECT * FROM TABLE(
+        |   SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND))
+        |   WHERE window_start > TIMESTAMP '2000-01-01 10:10:00.000'
+        |)
+        |GROUP BY `name`, window_start, window_end
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      
"a,2020-10-10T00:00:01,2020-10-10T00:00:13,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2",
+      "b,2020-10-10T00:00:06,2020-10-10T00:00:12,2,6.66,6.0,3.0,2,Hello|Hi",
+      "b,2020-10-10T00:00:16,2020-10-10T00:00:21,1,4.44,4.0,4.0,1,Hi",
+      "b,2020-10-10T00:00:34,2020-10-10T00:00:39,1,3.33,3.0,3.0,1,Comment#3",
+      "null,2020-10-10T00:00:32,2020-10-10T00:00:37,1,7.77,7.0,7.0,0,null"
+    )
+    assertThat(sink.getAppendResults.sorted.mkString("\n"))
+      .isEqualTo(expected.sorted.mkString("\n"))
+  }
+
   @TestTemplate
   def testEventTimeSessionWindowWithCDCSource(): Unit = {
     val sql =
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala
index 90f0d610d77..6a55aaa1174 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala
@@ -18,14 +18,12 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, 
StreamingWithStateTestBase, TestData, TestingAppendSink}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
-import org.apache.flink.types.Row
 
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{BeforeEach, TestTemplate}
@@ -383,4 +381,106 @@ class WindowTableFunctionITCase(mode: StateBackendMode) 
extends StreamingWithSta
     assertThat(sink.getAppendResults.sorted.mkString("\n"))
       .isEqualTo(expected.sorted.mkString("\n"))
   }
+
+  @TestTemplate
+  def testSessionWindow(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  TO_TIMESTAMP(`ts`),
+        |  `int`,
+        |  `double`,
+        |  `float`,
+        |  `bigdec`,
+        |  `string`,
+        |  `name`,
+        |  CAST(`rowtime` AS STRING),
+        |  window_start,
+        |  window_end,
+        |  window_time
+        |FROM TABLE(SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' 
SECOND))
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 
00:00:02.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 
00:00:03.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 
00:00:08.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000," +
+        "2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999",
+      "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000," +
+        "2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999",
+      "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 
00:00:34.000," +
+        "2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999"
+    )
+    assertThat(sink.getAppendResults.sorted.mkString("\n"))
+      .isEqualTo(expected.sorted.mkString("\n"))
+  }
+
+  @TestTemplate
+  def testSessionWindowWithPartitionBy(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  TO_TIMESTAMP(`ts`),
+        |  `int`,
+        |  `double`,
+        |  `float`,
+        |  `bigdec`,
+        |  `string`,
+        |  `name`,
+        |  CAST(`rowtime` AS STRING),
+        |  window_start,
+        |  window_end,
+        |  window_time
+        |FROM TABLE(SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND))
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toDataStream.addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 
00:00:02.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 
00:00:03.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000," +
+        "2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999",
+      "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000," +
+        "2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999",
+      "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 
00:00:08.000," +
+        "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999",
+      "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000," +
+        "2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999",
+      "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000," +
+        "2020-10-10T00:00:32,2020-10-10T00:00:37,2020-10-10T00:00:36.999",
+      "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 
00:00:34.000," +
+        "2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"
+    )
+    assertThat(sink.getAppendResults.sorted.mkString("\n"))
+      .isEqualTo(expected.sorted.mkString("\n"))
+  }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
index 7228a5f2fe1..f6939cffb46 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceUnshared
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator;
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
 import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
-import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners;
 import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator;
 import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
@@ -206,20 +205,14 @@ public class WindowAggOperatorBuilder {
 
     @SuppressWarnings("unchecked")
     private WindowOperatorBase<RowData, ?> buildUnslicingWindowOperator() {
-
-        if (assigner instanceof UnsliceAssigners.SessionUnsliceAssigner) {
-            final UnsliceWindowAggProcessor windowProcessor =
-                    new UnsliceWindowAggProcessor(
-                            (GeneratedNamespaceAggsHandleFunction<TimeWindow>)
-                                    generatedAggregateFunction,
-                            (UnsliceAssigner<TimeWindow>) assigner,
-                            accSerializer,
-                            indexOfCountStart,
-                            shiftTimeZone);
-            return new UnslicingWindowOperator<>(windowProcessor);
-        }
-
-        throw new UnsupportedOperationException(
-                "Unsupported unslice assigner: " + 
assigner.getClass().getCanonicalName());
+        final UnsliceWindowAggProcessor windowProcessor =
+                new UnsliceWindowAggProcessor(
+                        (GeneratedNamespaceAggsHandleFunction<TimeWindow>)
+                                generatedAggregateFunction,
+                        (UnsliceAssigner<TimeWindow>) assigner,
+                        accSerializer,
+                        indexOfCountStart,
+                        shiftTimeZone);
+        return new UnslicingWindowOperator<>(windowProcessor);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java
index ba34c4e3b67..a7030220ea2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java
@@ -19,17 +19,24 @@
 package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.operators.window.MergeCallback;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.InternalTimeWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
 import org.apache.flink.table.runtime.operators.window.tvf.common.ClockService;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.NavigableSet;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
@@ -146,4 +153,126 @@ public class UnsliceAssigners {
             return String.format("SessionWindow(gap=%dms)", sessionGap);
         }
     }
+
+    /**
+     * Creates a {@link UnsliceAssigner} that assigns elements which has been 
attached window start
+     * and window end timestamp to windows. The assigned windows doesn't need 
to be merged again.
+     *
+     * @param windowStartIndex the index of window start field in the input 
row, mustn't be a
+     *     negative value.
+     * @param windowEndIndex the index of window end field in the input row, 
mustn't be a negative
+     *     value.
+     */
+    public static WindowedUnsliceAssigner windowed(
+            int windowStartIndex, int windowEndIndex, 
UnsliceAssigner<TimeWindow> innerAssigner) {
+        return new WindowedUnsliceAssigner(windowStartIndex, windowEndIndex, 
innerAssigner);
+    }
+
+    /**
+     * The {@link UnsliceAssigner} for elements have been merged into 
unslicing windows and attached
+     * window start and end timestamps.
+     */
+    public static class WindowedUnsliceAssigner extends 
MergingWindowAssigner<TimeWindow>
+            implements UnsliceAssigner<TimeWindow>, InternalTimeWindowAssigner 
{
+
+        private static final long serialVersionUID = 1L;
+
+        private final int windowStartIndex;
+
+        private final int windowEndIndex;
+
+        private final UnsliceAssigner<TimeWindow> innerAssigner;
+
+        public WindowedUnsliceAssigner(
+                int windowStartIndex,
+                int windowEndIndex,
+                UnsliceAssigner<TimeWindow> innerAssigner) {
+            this.windowStartIndex = windowStartIndex;
+            this.windowEndIndex = windowEndIndex;
+            this.innerAssigner = innerAssigner;
+        }
+
+        @Override
+        public Optional<TimeWindow> assignActualWindow(
+                RowData element,
+                ClockService clock,
+                MergingWindowProcessFunction<?, TimeWindow> windowFunction)
+                throws Exception {
+            return innerAssigner.assignActualWindow(element, clock, 
windowFunction);
+        }
+
+        @Override
+        public Optional<TimeWindow> assignStateNamespace(
+                RowData element,
+                ClockService clock,
+                MergingWindowProcessFunction<?, TimeWindow> windowFunction)
+                throws Exception {
+            return innerAssigner.assignStateNamespace(element, clock, 
windowFunction);
+        }
+
+        @Override
+        public MergingWindowAssigner<TimeWindow> getMergingWindowAssigner() {
+            return this;
+        }
+
+        @Override
+        public boolean isEventTime() {
+            // it always works in event-time mode if input row has been 
attached windows
+            return true;
+        }
+
+        @Override
+        public Collection<TimeWindow> assignWindows(RowData element, long 
timestamp)
+                throws IOException {
+            return Collections.singletonList(createWindow(element));
+        }
+
+        private TimeWindow createWindow(RowData element) {
+            if (element.isNullAt(windowStartIndex) || 
element.isNullAt(windowEndIndex)) {
+                throw new RuntimeException("RowTime field should not be 
null.");
+            }
+            // Precision for row timestamp is always 3
+            final long windowStartTime = 
element.getTimestamp(windowStartIndex, 3).getMillisecond();
+            final long windowEndTime = element.getTimestamp(windowEndIndex, 
3).getMillisecond();
+            return new TimeWindow(windowStartTime, windowEndTime);
+        }
+
+        @Override
+        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig 
executionConfig) {
+            return new TimeWindow.Serializer();
+        }
+
+        @Override
+        public String toString() {
+            return getDescription();
+        }
+
+        @Override
+        public String getDescription() {
+            return String.format(
+                    "WindowedUnsliceWindow(innerAssigner=%s, StartIndex=%d, 
windowEndIndex=%d)",
+                    innerAssigner.getDescription(), windowStartIndex, 
windowEndIndex);
+        }
+
+        @Override
+        public InternalTimeWindowAssigner withEventTime() {
+            throw new IllegalStateException(
+                    "Should not call this function on 
WindowedUnsliceAssigner.");
+        }
+
+        @Override
+        public InternalTimeWindowAssigner withProcessingTime() {
+            throw new IllegalStateException(
+                    "Should not call this function on 
WindowedUnsliceAssigner.");
+        }
+
+        @Override
+        public void mergeWindows(
+                TimeWindow newWindow,
+                NavigableSet<TimeWindow> sortedWindows,
+                MergeCallback<TimeWindow, Collection<TimeWindow>> callback) {
+            // no need to merge windows because the window tvf operator in 
upstream has done for
+            // them
+        }
+    }
 }

Reply via email to