This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 54fa648  Revert "[FLINK-23544][table-planner] Window TVF Supports 
session window in plan"
54fa648 is described below

commit 54fa64863f9e278ceddc8d294568e56350d9f545
Author: godfreyhe <[email protected]>
AuthorDate: Fri Sep 3 15:29:23 2021 +0800

    Revert "[FLINK-23544][table-planner] Window TVF Supports session window in 
plan"
    
    This reverts commit 34d51002
    
    Currently, the session window TVFs syntax is not correct. The syntax 
mentioned in FLIP is depended on CALCITE-4337. We will re-introduce this 
feature in FLINK-24024 after CALCITE-4337 is finished.
---
 .../functions/sql/FlinkSqlOperatorTable.java       |   1 -
 .../functions/sql/SqlSessionTableFunction.java     |  67 ------
 .../functions/sql/SqlWindowTableFunction.java      |   3 -
 .../planner/plan/logical/SessionWindowSpec.java    |  74 -------
 .../table/planner/plan/logical/WindowSpec.java     |   3 +-
 .../exec/stream/StreamExecWindowTableFunction.java |   5 -
 .../TwoStageOptimizedWindowAggregateRule.java      |   7 -
 .../StreamPhysicalLocalWindowAggregate.scala       |  13 +-
 .../stream/StreamPhysicalWindowAggregate.scala     |  64 ++----
 .../plan/rules/logical/SplitAggregateRule.scala    |  10 +-
 .../table/planner/plan/utils/WindowUtil.scala      |   4 -
 .../planner/plan/stream/sql/WindowRankTest.xml     |  53 -----
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 236 ---------------------
 .../plan/stream/sql/join/WindowJoinTest.xml        | 139 ------------
 .../planner/plan/stream/sql/WindowRankTest.scala   |  79 +------
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  64 ------
 .../plan/stream/sql/join/WindowJoinTest.scala      | 106 ---------
 .../runtime/stream/sql/WindowAggregateITCase.scala |  87 --------
 .../window/WindowTableFunctionOperator.java        |   3 -
 19 files changed, 21 insertions(+), 997 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 877fff9..2c6d46e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1181,5 +1181,4 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
     public static final SqlFunction HOP = new SqlHopTableFunction();
     public static final SqlFunction CUMULATE = new SqlCumulateTableFunction();
-    public static final SqlFunction SESSION = new SqlSessionTableFunction();
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
deleted file mode 100644
index 629da14..0000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.sql;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperator;
-
-/**
- * SqlSessionTableFunction implements an operator for session.
- *
- * <p>It allows three parameters:
- *
- * <ol>
- *   <li>a table
- *   <li>a descriptor to provide a time attribute column name from the input 
table
- *   <li>an interval parameter to specify an inactive activity gap to break 
sessions
- * </ol>
- */
-public class SqlSessionTableFunction extends SqlWindowTableFunction {
-
-    public SqlSessionTableFunction() {
-        super(SqlKind.SESSION.name(), new OperandMetadataImpl());
-    }
-
-    /** Operand type checker for SESSION. */
-    private static class OperandMetadataImpl extends AbstractOperandMetadata {
-        OperandMetadataImpl() {
-            super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, 
PARAM_SESSION_GAP), 3);
-        }
-
-        @Override
-        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
-            if (!checkTableAndDescriptorOperands(callBinding, 1)) {
-                return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
-            }
-            if (!checkIntervalOperands(callBinding, 2)) {
-                return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
-            }
-            // check time attribute
-            return throwExceptionOrReturnFalse(
-                    checkTimeColumnDescriptorOperand(callBinding, 1), 
throwOnFailure);
-        }
-
-        @Override
-        public String getAllowedSignatures(SqlOperator op, String opName) {
-            return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime 
interval)";
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
index 3c23dde..41926ff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
@@ -83,9 +83,6 @@ public class SqlWindowTableFunction extends SqlFunction 
implements SqlTableFunct
     /** The slide interval, only used for HOP window. */
     protected static final String PARAM_STEP = "STEP";
 
-    /** The session gap interval, only used for SESSION window. */
-    protected static final String PARAM_SESSION_GAP = "GAP";
-
     /**
      * Type-inference strategy whereby the row type of a table function call 
is a ROW, which is
      * combined from the row type of operand #0 (which is a TABLE) and two 
additional fields. The
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
deleted file mode 100644
index 962e5ba..0000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.logical;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-
-import java.time.Duration;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.TimeUtils.formatWithHighestUnit;
-
-/** Logical representation of a session window specification. */
-@JsonTypeName("SessionWindow")
-public class SessionWindowSpec implements WindowSpec {
-    public static final String FIELD_NAME_GAP = "gap";
-
-    @JsonProperty(FIELD_NAME_GAP)
-    private final Duration gap;
-
-    @JsonCreator
-    public SessionWindowSpec(@JsonProperty(FIELD_NAME_GAP) Duration gap) {
-        this.gap = checkNotNull(gap);
-    }
-
-    @Override
-    public String toSummaryString(String windowing) {
-        return String.format("SESSION(%s, gap=[%s])", windowing, 
formatWithHighestUnit(gap));
-    }
-
-    public Duration getGap() {
-        return gap;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        SessionWindowSpec that = (SessionWindowSpec) o;
-        return gap.equals(that.gap);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(SessionWindowSpec.class, gap);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("SESSION(gap=[%s])", formatWithHighestUnit(gap));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
index fbde8a9..bb3c261 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
@@ -26,8 +26,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 @JsonSubTypes({
     @JsonSubTypes.Type(value = TumblingWindowSpec.class),
     @JsonSubTypes.Type(value = HoppingWindowSpec.class),
-    @JsonSubTypes.Type(value = CumulativeWindowSpec.class),
-    @JsonSubTypes.Type(value = SessionWindowSpec.class)
+    @JsonSubTypes.Type(value = CumulativeWindowSpec.class)
 })
 public interface WindowSpec {
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
index af0a6c5..5fd2025 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
 import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
 import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
 import org.apache.flink.table.planner.plan.logical.WindowSpec;
@@ -120,10 +119,6 @@ public class StreamExecWindowTableFunction extends 
ExecNodeBase<RowData>
                                     + "3. join with join condition contains 
window starts equality of input tables "
                                     + "and window ends equality of input 
tables.\n",
                             windowSummary));
-        } else if (windowingStrategy.getWindow() instanceof SessionWindowSpec) 
{
-            // WindowTableFunctionOperator is not suitable for Session Window 
because can't do
-            // state-less window assigning for input row per record for 
Session Window.
-            throw new TableException("Session Window TableFunction is not 
supported yet.");
         } else if (!windowingStrategy.isRowtime()) {
             throw new TableException("Processing time Window TableFunction is 
not supported yet.");
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
index b956729..8eeb79c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.rules.physical.stream;
 
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
 import 
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
@@ -96,12 +95,6 @@ public class TwoStageOptimizedWindowAggregateRule extends 
RelOptRule {
             return false;
         }
 
-        // session window doesn't support two-phase,
-        // otherwise window assigner results may be different
-        if (windowing.getWindow() instanceof SessionWindowSpec) {
-            return false;
-        }
-
         // all aggregate function should support merge() method
         if 
(!AggregateUtil.doAllSupportPartialMerge(windowAgg.aggInfoList().aggInfos())) {
             return false;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
index 7035e99..2823aab 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, 
PlannerSliceEnd, PlannerWindowReference}
-import org.apache.flink.table.planner.plan.logical.{SessionWindowSpec, 
TimeAttributeWindowingStrategy, WindowAttachedWindowingStrategy, 
WindowingStrategy}
+import 
org.apache.flink.table.planner.plan.logical.{TimeAttributeWindowingStrategy, 
WindowAttachedWindowingStrategy, WindowingStrategy}
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedWindowAggregateRule
@@ -69,17 +69,8 @@ class StreamPhysicalLocalWindowAggregate(
 
   override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
     windowing match {
-      case _: WindowAttachedWindowingStrategy =>
+      case _: WindowAttachedWindowingStrategy | _: 
TimeAttributeWindowingStrategy =>
         // pass
-      case tws: TimeAttributeWindowingStrategy =>
-        tws.getWindow match {
-          case _: SessionWindowSpec =>
-            return litmus.fail("StreamPhysicalLocalWindowAggregate should not 
accept " +
-              "TimeAttributeWindowingStrategy with Session window. " +
-              "This should never happen, please open an issue.")
-          case _ =>
-            // pass
-        }
       case _ =>
         return litmus.fail("StreamPhysicalLocalWindowAggregate should only 
accepts " +
           "WindowAttachedWindowingStrategy and TimeAttributeWindowingStrategy, 
" +
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
index 0837c1d..eaa70e2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
@@ -18,23 +18,22 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
-import org.apache.flink.table.expressions.FieldReferenceExpression
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, 
PlannerWindowReference}
-import org.apache.flink.table.planner.plan.logical.{SessionGroupWindow, 
SessionWindowSpec, TimeAttributeWindowingStrategy, WindowingStrategy}
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.{StreamExecGroupWindowAggregate,
 StreamExecWindowAggregate}
+import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, 
PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, 
PlannerWindowStart}
+import org.apache.flink.table.planner.plan.logical.WindowingStrategy
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.utils.{AggregateInfoList, 
AggregateUtil, FlinkRelOptUtil, RelExplainUtil, WindowUtil}
 import 
org.apache.flink.table.planner.plan.utils.WindowUtil.checkEmitConfiguration
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
-
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConverters._
 
@@ -105,45 +104,14 @@ class StreamPhysicalWindowAggregate(
 
   override def translateToExecNode(): ExecNode[_] = {
     checkEmitConfiguration(FlinkRelOptUtil.getTableConfigFromContext(this))
-    windowing.getWindow match {
-      case windowSpec: SessionWindowSpec =>
-        windowing match {
-          case timeWindowStrategy: TimeAttributeWindowingStrategy =>
-            val timeAttributeFieldName = getInput.getRowType.getFieldNames.get(
-              timeWindowStrategy.getTimeAttributeIndex)
-            val timeAttributeType = windowing.getTimeAttributeType
-            val logicalWindow = SessionGroupWindow(
-              new PlannerWindowReference("w$", timeAttributeType),
-              new FieldReferenceExpression(
-                timeAttributeFieldName,
-                fromLogicalTypeToDataType(timeAttributeType),
-                0,
-                timeWindowStrategy.getTimeAttributeIndex),
-              intervalOfMillis(windowSpec.getGap.toMillis)
-            )
-            new StreamExecGroupWindowAggregate(
-              grouping,
-              aggCalls.toArray,
-              logicalWindow,
-              namedWindowProperties.toArray,
-              false,
-              InputProperty.DEFAULT,
-              FlinkTypeFactory.toLogicalRowType(getRowType),
-              getRelDetailedDescription
-            )
-          case _ =>
-            throw new UnsupportedOperationException(s"$windowing is not 
supported yet.")
-        }
-      case _ =>
-        new StreamExecWindowAggregate(
-          grouping,
-          aggCalls.toArray,
-          windowing,
-          namedWindowProperties.toArray,
-          InputProperty.DEFAULT,
-          FlinkTypeFactory.toLogicalRowType(getRowType),
-          getRelDetailedDescription
-        )
-    }
+    new StreamExecWindowAggregate(
+      grouping,
+      aggCalls.toArray,
+      windowing,
+      namedWindowProperties.toArray,
+      InputProperty.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index bbe710d..31d1f25 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.calcite.{FlinkContext, 
FlinkLogicalRelFactories, FlinkRelBuilder}
 import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, 
SqlFirstLastValueAggFunction}
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec
 import org.apache.flink.table.planner.plan.PartialFinalType
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
@@ -139,18 +138,11 @@ class SplitAggregateRule extends RelOptRule(
     val windowProps = fmq.getRelWindowProperties(agg.getInput)
     val isWindowAgg = 
WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps)
     val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime
-
-    // disable distinct split for session window,
-    // otherwise window assigner results may be different
-    val isSessionWindowAgg = isWindowAgg &&
-      windowProps.getWindowSpec.isInstanceOf[SessionWindowSpec]
-
     // TableAggregate is not supported. see also FLINK-21923.
     val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList)
 
     agg.partialFinalType == PartialFinalType.NONE && 
agg.containsDistinctCall() &&
-      splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg &&
-      !isTableAgg && !isSessionWindowAgg
+      splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && 
!isTableAgg
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index edd8f0e..37056b8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -218,10 +218,6 @@ object WindowUtil {
         val step = getOperandAsLong(windowCall.operands(2))
         val maxSize = getOperandAsLong(windowCall.operands(3))
         new CumulativeWindowSpec(Duration.ofMillis(maxSize), 
Duration.ofMillis(step), offset)
-
-      case FlinkSqlOperatorTable.SESSION =>
-        val gap = getOperandAsLong(windowCall.operands(2))
-        new SessionWindowSpec(Duration.ofMillis(gap))
     }
 
     new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, 
timeIndex)
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
index c3061ad..0b7a643 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
@@ -128,59 +128,6 @@ Calc(select=[window_start, window_end, window_time, a, 
cnt, sum_d, max_d, wAvg,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testOnSessionWindowAggregate">
-    <Resource name="sql">
-      <![CDATA[
-SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv
-FROM (
-SELECT *,
-   ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) 
as rownum
-FROM (
-  SELECT
-    a,
-    window_start,
-    window_end,
-    window_time,
-    count(*) as cnt,
-    sum(d) as sum_d,
-    max(d) filter (where b > 1000) as max_d,
-    weightedAvg(b, e) AS wAvg,
-    count(distinct c) AS uv
-  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
-  GROUP BY a, window_start, window_end, window_time
-  )
-)
-WHERE rownum <= 3
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(window_start=[$1], window_end=[$2], window_time=[$3], a=[$0], 
cnt=[$4], sum_d=[$5], max_d=[$6], wAvg=[$7], uv=[$8])
-+- LogicalFilter(condition=[<=($9, 3)])
-   +- LogicalProject(a=[$0], window_start=[$1], window_end=[$2], 
window_time=[$3], cnt=[$4], sum_d=[$5], max_d=[$6], wAvg=[$7], uv=[$8], 
rownum=[ROW_NUMBER() OVER (PARTITION BY $1, $2 ORDER BY $4 DESC NULLS LAST)])
-      +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
sum_d=[SUM($4)], max_d=[MAX($4) FILTER $5], wAvg=[weightedAvg($6, $7)], 
uv=[COUNT(DISTINCT $8)])
-         +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], 
window_time=[$9], d=[$3], $f5=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
-            +- LogicalTableFunctionScan(invocation=[SESSION($6, 
DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
-               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
-                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
-                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[window_start, window_end, window_time, a, cnt, sum_d, max_d, 
wAvg, uv])
-+- WindowRank(window=[SESSION(win_start=[window_start], win_end=[window_end], 
gap=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], 
partitionBy=[], orderBy=[cnt DESC], select=[a, window_start, window_end, 
window_time, cnt, sum_d, max_d, wAvg, uv])
-   +- Exchange(distribution=[single])
-      +- Calc(select=[a, window_start, window_end, window_time, cnt, sum_d, 
max_d, wAvg, uv])
-         +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], 
gap=[15 min])], select=[a, COUNT(*) AS cnt, SUM(d) AS sum_d, MAX(d) FILTER $f5 
AS max_d, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
-            +- Exchange(distribution=[hash[a]])
-               +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f5, b, e, c, 
rowtime])
-                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testOnTumbleWindowAggregate">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 4903153f8..5141a01 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1583,242 +1583,6 @@ Sink(table=[default_catalog.default_database.s1], 
fields=[window_start, window_e
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, 
rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, 
rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSession_OnProctime[aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   weightedAvg(b, e) AS wAvg,
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   weightedAvg(b, e) AS wAvg,
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   weightedAvg(b, e) AS wAvg,
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   a,
-   window_start,
-   window_end,
-   count(*),
-   sum(d),
-   max(d) filter (where b > 1000),
-   weightedAvg(b, e) AS wAvg,
-   count(distinct c) AS uv
-FROM TABLE(
-  SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
-   +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS 
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testTumble_CalcOnTVF[aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index c57ba95..5991c45 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -1163,145 +1163,6 @@ Calc(select=[a, window_start, window_end, 
PROCTIME_MATERIALIZE(window_time) AS w
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testOnSessionWindowAggregate">
-    <Resource name="sql">
-      <![CDATA[
-SELECT L.*, R.*
-FROM (
-  SELECT
-    a,
-    window_start,
-    window_end,
-    window_time,
-    count(*) as cnt,
-    count(distinct c) AS uv
-  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
-  GROUP BY a, window_start, window_end, window_time
-) L
-JOIN (
-  SELECT
-    a,
-    window_start,
-    window_end,
-    window_time,
-    count(*) as cnt,
-    count(distinct c) AS uv
-  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
-  GROUP BY a, window_start, window_end, window_time
-) R
-ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
-+- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
-   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
-   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
-   :     +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
-   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
-   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
-   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
-   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
-   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
-      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
-         +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
-                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
-                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-WindowJoin(leftWindow=[SESSION(win_start=[window_start], win_end=[window_end], 
gap=[15 min])], rightWindow=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
-:- Exchange(distribution=[hash[a]])
-:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
-:     +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], 
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
-:        +- Exchange(distribution=[hash[a]])
-:           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
-+- Exchange(distribution=[hash[a]])
-   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
-      +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], 
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
-         +- Exchange(distribution=[hash[a]])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testOnSessionWindowAggregateOnProctime">
-    <Resource name="sql">
-      <![CDATA[
-SELECT L.*, R.*
-FROM (
-  SELECT
-    a,
-    window_start,
-    window_end,
-    window_time,
-    count(*) as cnt,
-    count(distinct c) AS uv
-  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
-  GROUP BY a, window_start, window_end, window_time
-) L
-JOIN (
-  SELECT
-    a,
-    window_start,
-    window_end,
-    window_time,
-    count(*) as cnt,
-    count(distinct c) AS uv
-  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
-  GROUP BY a, window_start, window_end, window_time
-) R
-ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
-+- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
-   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
-   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
-   :     +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
-   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
-   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
-   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
-   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
-   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
-      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
-         +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
-                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
-                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
-+- WindowJoin(leftWindow=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[15 min])], 
rightWindow=[SESSION(win_start=[window_start], win_end=[window_end], gap=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, 
window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, 
cnt0, uv0])
-   :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
-   :     +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], 
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) 
AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
-   :        +- Exchange(distribution=[hash[a]])
-   :           +- Calc(select=[a, c, proctime])
-   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
-   :                 +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
-   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
-         +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], 
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) 
AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
-            +- Exchange(distribution=[hash[a]])
-               +- Calc(select=[a, c, proctime])
-                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
-                     +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testOnTumbleWindowAggregate">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
index fae3d5c..b11a7f5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
@@ -82,25 +82,7 @@ class WindowRankTest extends TableTestBase {
   }
 
   @Test
-  def testUnsupportedWindowTVF_Session(): Unit = {
-    val sql =
-      """
-        |SELECT window_start, window_end, window_time, a, b, c, d, e
-        |FROM (
-        |SELECT *,
-        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER 
BY b DESC) as rownum
-        |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' 
MINUTE))
-        |)
-        |WHERE rownum <= 3
-      """.stripMargin
-
-    thrown.expectMessage("Session Window TableFunction is not supported yet.")
-    thrown.expect(classOf[TableException])
-    util.verifyExplain(sql)
-  }
-
-  @Test
-  def testUnsupportedWindowTVF_Proctime(): Unit = {
+  def testUnsupportedWindowTVF_TumbleOnProctime(): Unit = {
     val sql =
       """
         |SELECT window_start, window_end, window_time, a, b, c, d, e
@@ -408,65 +390,6 @@ class WindowRankTest extends TableTestBase {
     util.verifyExplain(sql)
   }
 
-  @Test
-  def testOnSessionWindowAggregate(): Unit = {
-    val sql =
-      """
-        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, 
wAvg, uv
-        |FROM (
-        |SELECT *,
-        |   ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY 
cnt DESC) as rownum
-        |FROM (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    sum(d) as sum_d,
-        |    max(d) filter (where b > 1000) as max_d,
-        |    weightedAvg(b, e) AS wAvg,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |  )
-        |)
-        |WHERE rownum <= 3
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
-  def testOnSessionWindowAggregateOnProctime(): Unit = {
-    val sql =
-      """
-        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, 
wAvg, uv
-        |FROM (
-        |SELECT *,
-        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER 
BY cnt DESC) as rownum
-        |FROM (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    sum(d) as sum_d,
-        |    max(d) filter (where b > 1000) as max_d,
-        |    weightedAvg(b, e) AS wAvg,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |  )
-        |)
-        |WHERE rownum <= 3
-      """.stripMargin
-
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Processing time Window TopN is not supported yet.")
-    util.verifyExplain(sql)
-  }
-
   // 
----------------------------------------------------------------------------------------
   // Tests for queries window rank could propagate time attribute
   // 
----------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index c3d1a25..96d3988 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -425,70 +425,6 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
   }
 
   @Test
-  def testSession_OnRowtime(): Unit = {
-    // Session window does not two-phase optimization
-    val sql =
-      """
-        |SELECT
-        |   a,
-        |   window_start,
-        |   window_end,
-        |   count(*),
-        |   sum(d),
-        |   max(d) filter (where b > 1000),
-        |   weightedAvg(b, e) AS wAvg,
-        |   count(distinct c) AS uv
-        |FROM TABLE(
-        |  SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
-        |GROUP BY a, window_start, window_end
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
-  def testSession_OnProctime(): Unit = {
-    // Session window does not two-phase optimization
-    val sql =
-      """
-        |SELECT
-        |   a,
-        |   window_start,
-        |   window_end,
-        |   count(*),
-        |   sum(d),
-        |   max(d) filter (where b > 1000),
-        |   weightedAvg(b, e) AS wAvg,
-        |   count(distinct c) AS uv
-        |FROM TABLE(
-        |  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-        |GROUP BY a, window_start, window_end
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
-  def testSession_DistinctSplitEnabled(): Unit = {
-    // Session window does not split-distinct optimization
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
-    val sql =
-      """
-        |SELECT
-        |   a,
-        |   window_start,
-        |   window_end,
-        |   count(*),
-        |   sum(d),
-        |   max(d) filter (where b > 1000),
-        |   count(distinct c) AS uv
-        |FROM TABLE(
-        |  SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-        |GROUP BY a, window_start, window_end
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
   def testCumulate_DistinctSplitEnabled(): Unit = {
     util.tableEnv.getConfig.getConfiguration.setBoolean(
       OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index df1931d..1c9b845 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -372,48 +372,6 @@ class WindowJoinTest extends TableTestBase {
     util.verifyExplain(sql)
   }
 
-  @Test
-  def testUnsupportedWindowTVF_SessionOnRowtime(): Unit = {
-    val sql =
-      """
-        |SELECT *
-        |FROM (
-        |  SELECT *
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
-        |) L
-        |JOIN (
-        |  SELECT *
-        |  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
-        |) R
-        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
-      """.stripMargin
-
-    thrown.expectMessage("Session Window TableFunction is not supported yet.")
-    thrown.expect(classOf[TableException])
-    util.verifyExplain(sql)
-  }
-
-  @Test
-  def testUnsupportedWindowTVF_SessionOnProctime(): Unit = {
-    val sql =
-      """
-        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
-        |FROM (
-        |  SELECT *
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
-        |) L
-        |JOIN (
-        |  SELECT *
-        |  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
-        |) R
-        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
-      """.stripMargin
-
-    thrown.expectMessage("Processing time Window Join is not supported yet.")
-    thrown.expect(classOf[TableException])
-    util.verifyExplain(sql)
-  }
-
   // 
----------------------------------------------------------------------------------------
   // Tests for invalid queries Join on window Aggregate
   // because left window strategy is not equals to right window strategy.
@@ -945,70 +903,6 @@ class WindowJoinTest extends TableTestBase {
   }
 
   @Test
-  def testOnSessionWindowAggregate(): Unit = {
-    val sql =
-      """
-        |SELECT L.*, R.*
-        |FROM (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |) L
-        |JOIN (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |) R
-        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
-  def testOnSessionWindowAggregateOnProctime(): Unit = {
-    val sql =
-      """
-        |SELECT L.*, R.*
-        |FROM (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |) L
-        |JOIN (
-        |  SELECT
-        |    a,
-        |    window_start,
-        |    window_end,
-        |    window_time,
-        |    count(*) as cnt,
-        |    count(distinct c) AS uv
-        |  FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
-        |  GROUP BY a, window_start, window_end, window_time
-        |) R
-        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
-      """.stripMargin
-    util.verifyRelPlan(sql)
-  }
-
-  @Test
   def testWindowJoinWithNonEqui(): Unit = {
     val sql =
       """
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index b02258f..3e79333 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
 import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.OptimizerConfigOptions
@@ -28,7 +27,6 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, 
StreamingWithStateTestBase, TestData, TestingAppendSink}
-import 
org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._
 import org.apache.flink.types.Row
@@ -902,91 +900,6 @@ class WindowAggregateITCase(
       CumulateWindowRollupExpectedData.sorted.mkString("\n"),
       sink.getAppendResults.sorted.mkString("\n"))
   }
-
-  @Test
-  def testEventTimeSessionWindow(): Unit = {
-    //To verify the "merge" functionality, we create this test with the 
following characteristics:
-    // 1. set the Parallelism to 1, and have the test data out of order
-    // 2. create a waterMark with 10ms offset to delay the window emission by 
10ms
-    val sessionData = List(
-      (1L, 1, "Hello", "a"),
-      (2L, 2, "Hello", "b"),
-      (8L, 8, "Hello", "a"),
-      (9L, 9, "Hello World", "b"),
-      (4L, 4, "Hello", "c"),
-      (16L, 16, "Hello", "d"))
-
-    val stream = failingDataSource(sessionData)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, String, String)](10L))
-    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'string, 'name)
-    tEnv.registerTable("T", table)
-
-    val sql =
-      """
-        |SELECT
-        |  `string`,
-        |  window_start,
-        |  window_time,
-        |  COUNT(1),
-        |  SUM(1),
-        |  COUNT(`int`),
-        |  SUM(`int`),
-        |  COUNT(DISTINCT name)
-        |FROM TABLE(
-        |  SESSION(TABLE T, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND))
-        |GROUP BY `string`, window_start, window_end, window_time
-      """.stripMargin
-
-    val sink = new TestingAppendSink
-    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1",
-      "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1",
-      "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3")
-    assertEquals(expected.sorted, sink.getAppendResults.sorted)
-  }
-
-  @Test
-  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
-    // create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
-    val sessionWindowTestData = List(
-      (1L, 2, "Hello"),       // (1, Hello)       - window
-      (2L, 2, "Hello"),       // (1, Hello)       - window, deduped
-      (8L, 2, "Hello"),       // (2, Hello)       - window, deduped during 
merge
-      (10L, 3, "Hello"),      // (2, Hello)       - window, forwarded during 
merge
-      (9L, 9, "Hello World"), // (1, Hello World) - window
-      (4L, 1, "Hello"),       // (1, Hello)       - window, triggering merge
-      (16L, 16, "Hello"))     // (3, Hello)       - window (not merged)
-
-    val stream = failingDataSource(sessionWindowTestData)
-      .assignTimestampsAndWatermarks(new 
TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
-    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    tEnv.registerTable("MyTable", table)
-
-    val sqlQuery =
-      """
-        |SELECT c,
-        |   COUNT(DISTINCT b),
-        |   window_end
-        |FROM TABLE(
-        |  SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0.005' 
SECOND))
-        |GROUP BY c, window_start, window_end
-      """.stripMargin
-    val sink = new TestingAppendSink
-    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till 
{14L}
-      "Hello,1,1970-01-01T00:00:00.021",       // window starts at [16L] till 
{21L}, not merged
-      "Hello,3,1970-01-01T00:00:00.015"        // window starts at [1L,2L],
-      //   merged with [8L,10L], by [4L], till {15L}
-    )
-    assertEquals(expected.sorted, sink.getAppendResults.sorted)
-  }
 }
 
 object WindowAggregateITCase {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
index 51dc68e..22911d7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
@@ -43,9 +43,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>Note: The operator only works for row-time.
  *
- * <p>Note: The operator is not suitable for Session Window because can't do 
state-less window
- * assigning for input row per record for Session Window.
- *
  * <p>Note: The operator emits per record instead of at the end of window.
  */
 public class WindowTableFunctionOperator extends TableStreamOperator<RowData>

Reply via email to