This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 41a14091740 [FLINK-35816][table-planner] Non-mergeable proctime tvf 
window aggregate needs to fallback to group aggregate
41a14091740 is described below

commit 41a140917403365c30429b40fbab9a0c85806269
Author: lincoln lee <[email protected]>
AuthorDate: Sat Jul 13 11:09:42 2024 +0800

    [FLINK-35816][table-planner] Non-mergeable proctime tvf window aggregate 
needs to fallback to group aggregate
    
    This closes #25082
---
 .../stream/StreamPhysicalWindowAggregate.scala     |   9 +-
 .../table/planner/plan/utils/WindowUtil.scala      | 113 +++-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 614 +++++++++++++++++++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 101 +++-
 4 files changed, 808 insertions(+), 29 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
index 93d245139df..9bb859be10b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
@@ -17,8 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy
+import 
org.apache.flink.table.planner.plan.logical.{WindowAttachedWindowingStrategy, 
WindowingStrategy}
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate
 import org.apache.flink.table.planner.plan.utils._
@@ -112,6 +113,12 @@ class StreamPhysicalWindowAggregate(
 
   override def translateToExecNode(): ExecNode[_] = {
     checkEmitConfiguration(unwrapTableConfig(this))
+
+    if (windowing.isInstanceOf[WindowAttachedWindowingStrategy] && 
windowing.isProctime) {
+      throw new TableException(
+        "Non-mergeable processing time window tvf aggregation is invalid, 
should fallback to group " +
+          "aggregation instead. This is a bug and should not happen. Please 
file an issue.")
+    }
     new StreamExecWindowAggregate(
       unwrapTableConfig(this),
       grouping,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 995a89a8a03..e664b41c1d2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
 import org.apache.flink.table.planner.plan.logical._
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, 
FlinkLogicalJoin, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
+import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, 
FlinkLogicalMatch, FlinkLogicalOverAggregate, FlinkLogicalRank, 
FlinkLogicalTableFunctionScan}
 import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
 import 
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED,
 TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
 import org.apache.flink.table.planner.typeutils.RowTypeUtils
@@ -35,7 +35,7 @@ import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAtt
 
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor}
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeFamily
@@ -45,10 +45,9 @@ import org.apache.calcite.util.{ImmutableBitSet, Util}
 import java.time.Duration
 import java.util.Collections
 
-import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 /** Utilities for window table-valued functions. */
 object WindowUtil {
@@ -333,7 +332,8 @@ object WindowUtil {
 
   /**
    * For rowtime window, return true if the given aggregate grouping contains 
window start and end.
-   * For proctime window, we should also check if it exists a neighbour 
windowTableFunctionCall.
+   * For proctime window, we should also check if it exists a neighbour 
windowTableFunctionCall and
+   * doesn't exist any [[RexCall]] on window time columns.
    *
    * If the window is a session window, we should also check if the partition 
keys are the same as
    * the group keys. See more at [[WindowUtil.validGroupKeyPartitionKey()]].
@@ -346,7 +346,7 @@ object WindowUtil {
       return false
     }
     if (WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)) 
{
-      windowProperties.isRowtime || existNeighbourWindowTableFunc(agg.getInput)
+      isValidRowtimeWindow(windowProperties) || 
isValidProcTimeWindow(windowProperties, fmq, agg)
     } else {
       false
     }
@@ -385,35 +385,94 @@ object WindowUtil {
     }
   }
 
-  private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+  private def isValidRowtimeWindow(windowProperties: RelWindowProperties): 
Boolean = {
+    // rowtime tvf window can support calculation on window columns even 
before aggregation
+    windowProperties.isRowtime
+  }
 
-    @tailrec
-    def find(rel: RelNode): Unit = {
-      rel match {
-        case rss: RelSubset =>
-          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
-          find(innerRel)
+  /**
+   * If the middle Calc(s) contains call(s) on window columns, we should not 
convert the Aggregate
+   * into WindowAggregate but GroupAggregate instead.
+   *
+   * The valid plan structure is like:
+   *
+   * {{{
+   * Aggregate
+   *  |
+   * Calc (should not contain call on window columns)
+   *  |
+   * WindowTableFunctionScan
+   * }}}
+   *
+   * and unlike:
+   *
+   * {{{
+   * Aggregate
+   *  |
+   * Calc
+   *  |
+   * Aggregate
+   *  |
+   * Calc
+   *  |
+   * WindowTableFunctionScan
+   * }}}
+   */
+  private def isValidProcTimeWindow(
+      windowProperties: RelWindowProperties,
+      fmq: FlinkRelMetadataQuery,
+      agg: FlinkLogicalAggregate): Boolean = {
+    val calcMatcher = new CalcWindowFunctionScanMatcher
+    try {
+      calcMatcher.go(agg.getInput(0))
+    } catch {
+      case _: Throwable => // do nothing
+    }
+    if (!calcMatcher.existNeighbourWindowTableFunc) {
+      return false
+    }
+    var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+      calcMatcher.calcNodes.exists(calc => 
calcContainsCallsOnWindowColumns(calc, fmq))
+
+    // aggregate call shouldn't be on window columns
+    val aggInputWindowProps = windowProperties.getWindowColumns
+    existCallOnWindowColumns = existCallOnWindowColumns || 
!agg.getAggCallList.forall {
+      call => 
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+    }
+    // proctime tvf window can't support calculation on window columns before 
aggregation
+    !existCallOnWindowColumns
+  }
 
+  private class CalcWindowFunctionScanMatcher extends RelVisitor {
+    val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
+    var existNeighbourWindowTableFunc = false
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case calc: Calc =>
+          calcNodes += calc
+          // continue to visit children
+          super.visit(calc, 0, parent)
         case scan: FlinkLogicalTableFunctionScan =>
           if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            existNeighbourWindowTableFunc = true
+            // stop visiting
             throw new Util.FoundOne
           }
-          find(scan.getInput(0))
-
-        // proctime attribute comes from these operators can not be used 
directly for proctime
-        // window aggregate, so further traversal of child nodes is unnecessary
-        case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: 
FlinkLogicalJoin =>
-
-        case sr: SingleRel => find(sr.getInput)
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          // special case doesn't call super.visit for RelSubSet because it 
has no children
+          visit(innerRel, 0, rss)
+        case _: FlinkLogicalAggregate | _: FlinkLogicalMatch | _: 
FlinkLogicalOverAggregate |
+            _: FlinkLogicalRank | _: BiRel | _: SetOp =>
+          // proctime attribute comes from these operators can't be used 
directly for proctime
+          // window aggregate, so further tree walk is unnecessary
+          throw new Util.FoundOne
+        case _ =>
+          // continue to visit children
+          super.visit(node, ordinal, parent)
       }
     }
-
-    try {
-      find(rel)
-    } catch {
-      case _: Util.FoundOne => return true
-    }
-    false
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 4e5c14dd9b1..75fb934413f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2272,6 +2272,620 @@ Sink(table=[default_catalog.default_database.s1], 
fields=[window_start, window_e
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
                +- Calc(select=[rowtime])
                   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+where window_start <> '123'
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[<>($7, _UTF-16LE'123')])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+            +- Calc(select=[a, c, proctime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+where window_start <> '123'
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[<>($7, _UTF-16LE'123')])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+            +- Calc(select=[a, c, proctime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithCorrelateWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select t.c, max(t2.x), count(t.a)
+from (
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes)) AS t
+  Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON TRUE
+)
+group by window_start, window_end, t.c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], EXPR$0=[$10], a=[$0])
+      +- LogicalJoin(condition=[true], joinType=[left])
+         :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- 
LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(EXPR$0) AS EXPR$1, 
COUNT(a) AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, EXPR$0, a])
+         +- Correlate(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], correlate=[table(str_split('Jack,John',','))], 
select=[a,b,c,d,e,rowtime,proctime,window_start,window_end,window_time,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) 
*PROCTIME* window_time, VARCHAR(2147483647) EXPR$0)] [...]
+            +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithCorrelateWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select t.c, max(t2.x), count(t.a)
+from (
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes)) AS t
+  Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON TRUE
+)
+group by window_start, window_end, t.c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], EXPR$0=[$10], a=[$0])
+      +- LogicalJoin(condition=[true], joinType=[left])
+         :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- 
LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(EXPR$0) AS EXPR$1, 
COUNT(a) AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, EXPR$0, a])
+         +- Correlate(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], correlate=[table(str_split('Jack,John',','))], 
select=[a,b,c,d,e,rowtime,proctime,window_start,window_end,window_time,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) 
*PROCTIME* window_time, VARCHAR(2147483647) EXPR$0)] [...]
+            +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithDedupWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+where rn = 1
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[=($10, 1)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS 
LAST)])
+            +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Deduplicate(keep=[LastRow], key=[c], order=[PROCTIME])
+            +- Exchange(distribution=[hash[c]])
+               +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
+                  +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithDedupWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+where rn = 1
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[=($10, 1)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS 
LAST)])
+            +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Deduplicate(keep=[LastRow], key=[c], order=[PROCTIME])
+            +- Exchange(distribution=[hash[c]])
+               +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
+                  +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithJoinWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select t.c, max(t2.e), count(t.a)
+from (
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes)) AS t
+  join MyTable t2 on t2.a = t.a
+)
+group by window_start, window_end, t.c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], e0=[$14], a=[$0])
+      +- LogicalJoin(condition=[=($10, $0)], joinType=[inner])
+         :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(e0) AS EXPR$1, COUNT(a) 
AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, e AS e0, a])
+         +- Join(joinType=[InnerJoin], where=[=(a0, a)], select=[a, c, 
window_start, window_end, window_time, a0, e], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, c, window_start, window_end, 
PROCTIME_MATERIALIZE(window_time) AS window_time])
+            :     +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+            :        +- Calc(select=[a, c, proctime])
+            :           +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            :              +- Calc(select=[a, c, PROCTIME() AS proctime, 
rowtime])
+            :                 +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]], 
fields=[a, c, e, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, e])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, e, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]], 
fields=[a, c, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithJoinWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select t.c, max(t2.e), count(t.a)
+from (
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes)) AS t
+  join MyTable t2 on t2.a = t.a
+)
+group by window_start, window_end, t.c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], e0=[$14], a=[$0])
+      +- LogicalJoin(condition=[=($10, $0)], joinType=[inner])
+         :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(e0) AS EXPR$1, COUNT(a) 
AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, window_time, e AS e0, a])
+         +- Join(joinType=[InnerJoin], where=[=(a0, a)], select=[a, c, 
window_start, window_end, window_time, a0, e], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, c, window_start, window_end, 
PROCTIME_MATERIALIZE(window_time) AS window_time])
+            :     +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+            :        +- Calc(select=[a, c, proctime])
+            :           +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            :              +- Calc(select=[a, c, PROCTIME() AS proctime, 
rowtime])
+            :                 +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]], 
fields=[a, c, e, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, e])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, e, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]], 
fields=[a, c, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithOverAggWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, max(c1), count(a)
+from (
+ select *, count(*) over (partition by c order by proctime desc) as c1
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], c1=[$10], a=[$0])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6], window_start=[$7], window_end=[$8], window_time=[$9], 
c1=[COUNT() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS LAST)])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) 
AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
+         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+            +- Exchange(distribution=[hash[c]])
+               +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithOverAggWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, max(c1), count(a)
+from (
+ select *, count(*) over (partition by c order by proctime desc) as c1
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], c1=[$10], a=[$0])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6], window_start=[$7], window_end=[$8], window_time=[$9], 
c1=[COUNT() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS LAST)])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) 
AS EXPR$2])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
+         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+            +- Exchange(distribution=[hash[c]])
+               +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithRankWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+where rn = 2
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[=($10, 2)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS 
LAST)])
+            +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=2, rankEnd=2], partitionBy=[c], orderBy=[proctime DESC], 
select=[a, c, proctime, window_start, window_end, window_time])
+            +- Exchange(distribution=[hash[c]])
+               +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
+                  +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithRankWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
+)
+where rn = 2
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[=($10, 2)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS 
LAST)])
+            +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6), 
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) 
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* 
window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=2, rankEnd=2], partitionBy=[c], orderBy=[proctime DESC], 
select=[a, c, proctime, window_start, window_end, window_time])
+            +- Exchange(distribution=[hash[c]])
+               +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
+                  +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithUnionWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+  select * from
+  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds))
+  union all
+  select * from
+  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5' seconds))
+) t
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalUnion(all=[true])
+         :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9])
+         :  +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6), 
10000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+         :     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :        +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+         :           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6), 
5000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Union(all=[true], union=[window_start, window_end, c, window_time, 
a])
+            :- Calc(select=[window_start, window_end, c, window_time, a])
+            :  +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
size=[10 s])])
+            :     +- Calc(select=[a, c, proctime])
+            :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            :           +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+            :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+            +- Calc(select=[window_start, window_end, c, window_time, a])
+               +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
size=[5 s])])
+                  +- Calc(select=[a, c, proctime])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeWindowTVFWithUnionWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+select c, count(a)
+from (
+  select * from
+  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds))
+  union all
+  select * from
+  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5' seconds))
+) t
+group by window_start, window_end, c, window_time
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+   +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2], 
window_time=[$9], a=[$0])
+      +- LogicalUnion(all=[true])
+         :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9])
+         :  +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6), 
10000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+         :     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+         :        +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+         :           +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+         :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8], 
window_time=[$9])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6), 
5000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+      +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+         +- Union(all=[true], union=[window_start, window_end, c, window_time, 
a])
+            :- Calc(select=[window_start, window_end, c, window_time, a])
+            :  +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
size=[10 s])])
+            :     +- Calc(select=[a, c, proctime])
+            :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+            :           +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+            :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+            +- Calc(select=[window_start, window_end, c, window_time, a])
+               +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
size=[5 s])])
+                  +- Calc(select=[a, c, proctime])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                        +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 86a86ceb372..e2193bfa2a8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.api.config.{AggregatePhaseStrategy, OptimizerConfi
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{WeightedAvg,
 WeightedAvgWithMerge}
 import 
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_DELAY,
 TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_DELAY, 
TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc1
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
 StringSplit}
 import org.apache.flink.table.planner.utils.TableTestBase
 import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
 
@@ -1651,6 +1651,105 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
       """.stripMargin
     util.verifyRelPlan(sql)
   }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select c, count(a)
+        |from
+        | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes))
+        |where window_start <> '123'
+        |group by window_start, window_end, c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithRankWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select c, count(a)
+        |from (
+        | select *, row_number() over (partition by c order by proctime desc) 
as rn
+        | from
+        |  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes))
+        |)
+        |where rn = 2
+        |group by window_start, window_end, c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithDedupWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select c, count(a)
+        |from (
+        | select *, row_number() over (partition by c order by proctime desc) 
as rn
+        | from
+        |  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes))
+        |)
+        |where rn = 1
+        |group by window_start, window_end, c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithOverAggWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select c, max(c1), count(a)
+        |from (
+        | select *, count(*) over (partition by c order by proctime desc) as c1
+        | from
+        |  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes))
+        |)
+        |group by window_start, window_end, c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithJoinWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select t.c, max(t2.e), count(t.a)
+        |from (
+        |  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes)) AS t
+        |  join MyTable t2 on t2.a = t.a
+        |)
+        |group by window_start, window_end, t.c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithCorrelateWhenCantMerge(): Unit = {
+    util.addTemporarySystemFunction("str_split", new StringSplit())
+    util.verifyRelPlan(
+      """
+        |select t.c, max(t2.x), count(t.a)
+        |from (
+        |  TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds, interval '5' minutes)) AS t
+        |  Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON 
TRUE
+        |)
+        |group by window_start, window_end, t.c, window_time
+        |""".stripMargin)
+  }
+
+  @TestTemplate
+  def testProctimeWindowTVFWithUnionWhenCantMerge(): Unit = {
+    util.verifyRelPlan(
+      """
+        |select c, count(a)
+        |from (
+        |  select * from
+        |  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10' 
seconds))
+        |  union all
+        |  select * from
+        |  TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5' 
seconds))
+        |) t
+        |group by window_start, window_end, c, window_time
+        |""".stripMargin)
+  }
 }
 
 object WindowAggregateTest {

Reply via email to