This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 30196294eb8 [FLINK-35816][table-planner] Non-mergeable proctime tvf
window aggregate needs to fallback to group aggregate
30196294eb8 is described below
commit 30196294eb863a2ea288ea562812b9ebaffece12
Author: lincoln lee <[email protected]>
AuthorDate: Fri Jul 12 23:18:34 2024 +0800
[FLINK-35816][table-planner] Non-mergeable proctime tvf window aggregate
needs to fallback to group aggregate
This closes #25075
---
.../stream/StreamPhysicalWindowAggregate.scala | 9 +-
.../table/planner/plan/utils/WindowUtil.scala | 113 +++-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 614 +++++++++++++++++++++
.../plan/stream/sql/agg/WindowAggregateTest.scala | 101 +++-
4 files changed, 808 insertions(+), 29 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
index 93d245139df..9bb859be10b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
@@ -17,8 +17,9 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy
+import
org.apache.flink.table.planner.plan.logical.{WindowAttachedWindowingStrategy,
WindowingStrategy}
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate
import org.apache.flink.table.planner.plan.utils._
@@ -112,6 +113,12 @@ class StreamPhysicalWindowAggregate(
override def translateToExecNode(): ExecNode[_] = {
checkEmitConfiguration(unwrapTableConfig(this))
+
+ if (windowing.isInstanceOf[WindowAttachedWindowingStrategy] &&
windowing.isProctime) {
+ throw new TableException(
+ "Non-mergeable processing time window tvf aggregation is invalid,
should fallback to group " +
+ "aggregation instead. This is a bug and should not happen. Please
file an issue.")
+ }
new StreamExecWindowAggregate(
unwrapTableConfig(this),
grouping,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 995a89a8a03..e664b41c1d2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -24,7 +24,7 @@ import
org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate,
FlinkLogicalJoin, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
+import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate,
FlinkLogicalMatch, FlinkLogicalOverAggregate, FlinkLogicalRank,
FlinkLogicalTableFunctionScan}
import
org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED,
TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
@@ -35,7 +35,7 @@ import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAtt
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor}
import org.apache.calcite.rel.core._
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
@@ -45,10 +45,9 @@ import org.apache.calcite.util.{ImmutableBitSet, Util}
import java.time.Duration
import java.util.Collections
-import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/** Utilities for window table-valued functions. */
object WindowUtil {
@@ -333,7 +332,8 @@ object WindowUtil {
/**
* For rowtime window, return true if the given aggregate grouping contains
window start and end.
- * For proctime window, we should also check if it exists a neighbour
windowTableFunctionCall.
+ * For proctime window, we should also check if it exists a neighbour
windowTableFunctionCall and
+ * doesn't exist any [[RexCall]] on window time columns.
*
* If the window is a session window, we should also check if the partition
keys are the same as
* the group keys. See more at [[WindowUtil.validGroupKeyPartitionKey()]].
@@ -346,7 +346,7 @@ object WindowUtil {
return false
}
if (WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties))
{
- windowProperties.isRowtime || existNeighbourWindowTableFunc(agg.getInput)
+ isValidRowtimeWindow(windowProperties) ||
isValidProcTimeWindow(windowProperties, fmq, agg)
} else {
false
}
@@ -385,35 +385,94 @@ object WindowUtil {
}
}
- private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+ private def isValidRowtimeWindow(windowProperties: RelWindowProperties):
Boolean = {
+ // rowtime tvf window can support calculation on window columns even
before aggregation
+ windowProperties.isRowtime
+ }
- @tailrec
- def find(rel: RelNode): Unit = {
- rel match {
- case rss: RelSubset =>
- val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
- find(innerRel)
+ /**
+ * If the middle Calc(s) contains call(s) on window columns, we should not
convert the Aggregate
+ * into WindowAggregate but GroupAggregate instead.
+ *
+ * The valid plan structure is like:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc (should not contain call on window columns)
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ *
+ * and unlike:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ */
+ private def isValidProcTimeWindow(
+ windowProperties: RelWindowProperties,
+ fmq: FlinkRelMetadataQuery,
+ agg: FlinkLogicalAggregate): Boolean = {
+ val calcMatcher = new CalcWindowFunctionScanMatcher
+ try {
+ calcMatcher.go(agg.getInput(0))
+ } catch {
+ case _: Throwable => // do nothing
+ }
+ if (!calcMatcher.existNeighbourWindowTableFunc) {
+ return false
+ }
+ var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+ calcMatcher.calcNodes.exists(calc =>
calcContainsCallsOnWindowColumns(calc, fmq))
+
+ // aggregate call shouldn't be on window columns
+ val aggInputWindowProps = windowProperties.getWindowColumns
+ existCallOnWindowColumns = existCallOnWindowColumns ||
!agg.getAggCallList.forall {
+ call =>
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+ }
+ // proctime tvf window can't support calculation on window columns before
aggregation
+ !existCallOnWindowColumns
+ }
+ private class CalcWindowFunctionScanMatcher extends RelVisitor {
+ val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
+ var existNeighbourWindowTableFunc = false
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case calc: Calc =>
+ calcNodes += calc
+ // continue to visit children
+ super.visit(calc, 0, parent)
case scan: FlinkLogicalTableFunctionScan =>
if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+ existNeighbourWindowTableFunc = true
+ // stop visiting
throw new Util.FoundOne
}
- find(scan.getInput(0))
-
- // proctime attribute comes from these operators can not be used
directly for proctime
- // window aggregate, so further traversal of child nodes is unnecessary
- case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _:
FlinkLogicalJoin =>
-
- case sr: SingleRel => find(sr.getInput)
+ case rss: RelSubset =>
+ val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+ // special case doesn't call super.visit for RelSubSet because it
has no children
+ visit(innerRel, 0, rss)
+ case _: FlinkLogicalAggregate | _: FlinkLogicalMatch | _:
FlinkLogicalOverAggregate |
+ _: FlinkLogicalRank | _: BiRel | _: SetOp =>
+ // proctime attribute comes from these operators can't be used
directly for proctime
+ // window aggregate, so further tree walk is unnecessary
+ throw new Util.FoundOne
+ case _ =>
+ // continue to visit children
+ super.visit(node, ordinal, parent)
}
}
-
- try {
- find(rel)
- } catch {
- case _: Util.FoundOne => return true
- }
- false
}
/**
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 4e5c14dd9b1..75fb934413f 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2272,6 +2272,620 @@ Sink(table=[default_catalog.default_database.s1],
fields=[window_start, window_e
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+- Calc(select=[rowtime])
+- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+where window_start <> '123'
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[<>($7, _UTF-16LE'123')])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, a],
where=[<>(window_start, '123')])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- Calc(select=[a, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+where window_start <> '123'
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[<>($7, _UTF-16LE'123')])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, a],
where=[<>(window_start, '123')])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- Calc(select=[a, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithCorrelateWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select t.c, max(t2.x), count(t.a)
+from (
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes)) AS t
+ Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON TRUE
+)
+group by window_start, window_end, t.c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], EXPR$0=[$10], a=[$0])
+ +- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +-
LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(EXPR$0) AS EXPR$1,
COUNT(a) AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, EXPR$0, a])
+ +- Correlate(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], correlate=[table(str_split('Jack,John',','))],
select=[a,b,c,d,e,rowtime,proctime,window_start,window_end,window_time,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3)
*PROCTIME* window_time, VARCHAR(2147483647) EXPR$0)] [...]
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithCorrelateWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select t.c, max(t2.x), count(t.a)
+from (
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes)) AS t
+ Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON TRUE
+)
+group by window_start, window_end, t.c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], EXPR$0=[$10], a=[$0])
+ +- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +-
LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(EXPR$0) AS EXPR$1,
COUNT(a) AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, EXPR$0, a])
+ +- Correlate(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], correlate=[table(str_split('Jack,John',','))],
select=[a,b,c,d,e,rowtime,proctime,window_start,window_end,window_time,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3)
*PROCTIME* window_time, VARCHAR(2147483647) EXPR$0)] [...]
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithDedupWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+where rn = 1
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[=($10, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS
LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Deduplicate(keep=[LastRow], key=[c], order=[PROCTIME])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, proctime, window_start, window_end,
window_time])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithDedupWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+where rn = 1
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[=($10, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS
LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Deduplicate(keep=[LastRow], key=[c], order=[PROCTIME])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, proctime, window_start, window_end,
window_time])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithJoinWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select t.c, max(t2.e), count(t.a)
+from (
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes)) AS t
+ join MyTable t2 on t2.a = t.a
+)
+group by window_start, window_end, t.c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], e0=[$14], a=[$0])
+ +- LogicalJoin(condition=[=($10, $0)], joinType=[inner])
+ :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(e0) AS EXPR$1, COUNT(a)
AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, e AS e0, a])
+ +- Join(joinType=[InnerJoin], where=[=(a0, a)], select=[a, c,
window_start, window_end, window_time, a0, e], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a, c, window_start, window_end,
PROCTIME_MATERIALIZE(window_time) AS window_time])
+ : +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ : +- Calc(select=[a, c, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, c, PROCTIME() AS proctime,
rowtime])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]],
fields=[a, c, e, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, e])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, e, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]],
fields=[a, c, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithJoinWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select t.c, max(t2.e), count(t.a)
+from (
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes)) AS t
+ join MyTable t2 on t2.a = t.a
+)
+group by window_start, window_end, t.c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], e0=[$14], a=[$0])
+ +- LogicalJoin(condition=[=($10, $0)], joinType=[inner])
+ :- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(e0) AS EXPR$1, COUNT(a)
AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c, window_time, e AS e0, a])
+ +- Join(joinType=[InnerJoin], where=[=(a0, a)], select=[a, c,
window_start, window_end, window_time, a0, e], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a, c, window_start, window_end,
PROCTIME_MATERIALIZE(window_time) AS window_time])
+ : +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ : +- Calc(select=[a, c, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, c, PROCTIME() AS proctime,
rowtime])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]],
fields=[a, c, e, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, e])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, e, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, e, rowtime], metadata=[]]],
fields=[a, c, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithOverAggWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, max(c1), count(a)
+from (
+ select *, count(*) over (partition by c order by proctime desc) as c1
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], c1=[$10], a=[$0])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6], window_start=[$7], window_end=[$8], window_time=[$9],
c1=[COUNT() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a)
AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
+ +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e,
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+ +- Exchange(distribution=[hash[c]])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithOverAggWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, max(c1), count(a)
+from (
+ select *, count(*) over (partition by c order by proctime desc) as c1
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4], EXPR$2=[$5])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[MAX($4)], EXPR$2=[COUNT($5)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], c1=[$10], a=[$0])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6], window_start=[$7], window_end=[$8], window_time=[$9],
c1=[COUNT() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1, EXPR$2])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a)
AS EXPR$2])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
+ +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e,
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+ +- Exchange(distribution=[hash[c]])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithRankWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+where rn = 2
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[=($10, 2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS
LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=2, rankEnd=2], partitionBy=[c], orderBy=[proctime DESC],
select=[a, c, proctime, window_start, window_end, window_time])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, proctime, window_start, window_end,
window_time])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithRankWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select *, row_number() over (partition by c order by proctime desc) as rn
+ from
+ TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
+)
+where rn = 2
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[=($10, 2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9], rn=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $6 DESC NULLS
LAST)])
+ +- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($6),
10000:INTERVAL SECOND, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3)
*ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=2, rankEnd=2], partitionBy=[c], orderBy=[proctime DESC],
select=[a, c, proctime, window_start, window_end, window_time])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, proctime, window_start, window_end,
window_time])
+ +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
max_size=[5 min], step=[10 s])])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS
proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithUnionWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select * from
+ TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds))
+ union all
+ select * from
+ TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5' seconds))
+) t
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6),
10000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6),
5000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Union(all=[true], union=[window_start, window_end, c, window_time,
a])
+ :- Calc(select=[window_start, window_end, c, window_time, a])
+ : +- WindowTableFunction(window=[TUMBLE(time_col=[proctime],
size=[10 s])])
+ : +- Calc(select=[a, c, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+ +- Calc(select=[window_start, window_end, c, window_time, a])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime],
size=[5 s])])
+ +- Calc(select=[a, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testProctimeWindowTVFWithUnionWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+select c, count(a)
+from (
+ select * from
+ TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds))
+ union all
+ select * from
+ TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5' seconds))
+) t
+group by window_start, window_end, c, window_time
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], EXPR$1=[$4])
++- LogicalAggregate(group=[{0, 1, 2, 3}], EXPR$1=[COUNT($4)])
+ +- LogicalProject(window_start=[$7], window_end=[$8], c=[$2],
window_time=[$9], a=[$0])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6),
10000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6], window_start=[$7], window_end=[$8],
window_time=[$9])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($6),
5000:INTERVAL SECOND)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[c, EXPR$1])
++- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+ +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+ +- Calc(select=[window_start, window_end, c,
PROCTIME_MATERIALIZE(window_time) AS window_time, a])
+ +- Union(all=[true], union=[window_start, window_end, c, window_time,
a])
+ :- Calc(select=[window_start, window_end, c, window_time, a])
+ : +- WindowTableFunction(window=[TUMBLE(time_col=[proctime],
size=[10 s])])
+ : +- Calc(select=[a, c, proctime])
+ : +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ : +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+ +- Calc(select=[window_start, window_end, c, window_time, a])
+ +- WindowTableFunction(window=[TUMBLE(time_col=[proctime],
size=[5 s])])
+ +- Calc(select=[a, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 86a86ceb372..e2193bfa2a8 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -23,7 +23,7 @@ import
org.apache.flink.table.api.config.{AggregatePhaseStrategy, OptimizerConfi
import
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{WeightedAvg,
WeightedAvgWithMerge}
import
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_DELAY,
TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_DELAY,
TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction
-import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc1
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
StringSplit}
import org.apache.flink.table.planner.utils.TableTestBase
import
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
Parameters}
@@ -1651,6 +1651,105 @@ class WindowAggregateTest(aggPhaseEnforcer:
AggregatePhaseStrategy) extends Tabl
""".stripMargin
util.verifyRelPlan(sql)
}
+
+ @TestTemplate
+ def testProctimeWindowTVFWithCalcOnWindowColumnWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select c, count(a)
+ |from
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes))
+ |where window_start <> '123'
+ |group by window_start, window_end, c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithRankWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select c, count(a)
+ |from (
+ | select *, row_number() over (partition by c order by proctime desc)
as rn
+ | from
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes))
+ |)
+ |where rn = 2
+ |group by window_start, window_end, c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithDedupWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select c, count(a)
+ |from (
+ | select *, row_number() over (partition by c order by proctime desc)
as rn
+ | from
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes))
+ |)
+ |where rn = 1
+ |group by window_start, window_end, c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithOverAggWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select c, max(c1), count(a)
+ |from (
+ | select *, count(*) over (partition by c order by proctime desc) as c1
+ | from
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes))
+ |)
+ |group by window_start, window_end, c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithJoinWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select t.c, max(t2.e), count(t.a)
+ |from (
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes)) AS t
+ | join MyTable t2 on t2.a = t.a
+ |)
+ |group by window_start, window_end, t.c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithCorrelateWhenCantMerge(): Unit = {
+ util.addTemporarySystemFunction("str_split", new StringSplit())
+ util.verifyRelPlan(
+ """
+ |select t.c, max(t2.x), count(t.a)
+ |from (
+ | TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds, interval '5' minutes)) AS t
+ | Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS t2(x) ON
TRUE
+ |)
+ |group by window_start, window_end, t.c, window_time
+ |""".stripMargin)
+ }
+
+ @TestTemplate
+ def testProctimeWindowTVFWithUnionWhenCantMerge(): Unit = {
+ util.verifyRelPlan(
+ """
+ |select c, count(a)
+ |from (
+ | select * from
+ | TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '10'
seconds))
+ | union all
+ | select * from
+ | TABLE(TUMBLE(table MyTable, DESCRIPTOR(proctime), interval '5'
seconds))
+ |) t
+ |group by window_start, window_end, c, window_time
+ |""".stripMargin)
+ }
}
object WindowAggregateTest {