This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new d74f352 [FLINK-13563][table-planner-blink] TumblingGroupWindow should
implement toString method to explain more info
d74f352 is described below
commit d74f35216a95d502a942e7492098e0663931e3a3
Author: godfreyhe <[email protected]>
AuthorDate: Fri Aug 2 18:36:17 2019 +0800
[FLINK-13563][table-planner-blink] TumblingGroupWindow should implement
toString method to explain more info
This closes #9347
---
.../table/planner/plan/logical/groupWindows.scala | 2 +
.../apache/flink/table/api/stream/ExplainTest.xml | 8 +-
.../planner/plan/batch/sql/DagOptimizationTest.xml | 4 +-
.../table/planner/plan/batch/sql/UnnestTest.xml | 4 +-
.../batch/sql/agg/AggregateReduceGroupingTest.xml | 10 +-
.../plan/batch/sql/agg/WindowAggregateTest.xml | 312 ++++++++++-----------
.../planner/plan/batch/table/GroupWindowTest.xml | 30 +-
.../logical/AggregateReduceGroupingRuleTest.xml | 10 +-
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 16 +-
.../stream/sql/RelTimeIndicatorConverterTest.xml | 12 +-
.../planner/plan/stream/sql/TableSourceTest.xml | 2 +-
.../table/planner/plan/stream/sql/UnnestTest.xml | 2 +-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 88 +++---
.../plan/stream/sql/join/WindowJoinTest.xml | 4 +-
.../planner/plan/stream/table/AggregateTest.xml | 4 +-
.../table/planner/plan/stream/table/CalcTest.xml | 8 +-
.../planner/plan/stream/table/GroupWindowTest.xml | 48 ++--
.../planner/plan/stream/table/TableSourceTest.xml | 4 +-
18 files changed, 285 insertions(+), 283 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
index 56c26a0..1f2c04a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/logical/groupWindows.scala
@@ -45,6 +45,8 @@ case class TumblingGroupWindow(
extends LogicalWindow(
alias,
timeField) {
+
+ override def toString: String = s"TumblingGroupWindow($alias, $timeField,
$size)"
}
//
------------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index f50b17e..62ac33d 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -670,7 +670,7 @@ Calc(select=[id1, rowtime AS ts, text],
updateAsRetraction=[true], accMode=[Acc]
+- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true],
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false],
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false],
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false],
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[id1]], updateAsRetraction=[true],
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3],
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost =
{rows, cpu, io, network, memory}
+- Reused(reference_id=[1])
@@ -717,7 +717,7 @@ Sink(name=[appendSink2], fields=[a, b],
updateAsRetraction=[false], accMode=[Acc
ship_strategy :
FORWARD
: Operator
- content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
:
Operator
@@ -817,7 +817,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[id2, cnt, name, goods, rowtime])
Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
@@ -864,7 +864,7 @@ Sink(name=[appendSink2], fields=[a, b])
ship_strategy :
FORWARD
: Operator
- content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
:
Operator
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
index acd9bb6..b13623b 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
@@ -596,9 +596,9 @@ LogicalSink(name=[sink2], fields=[a, sum_c, time])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c],
reuse_id=[1])
+HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts, 15000)],
properties=[w$start, w$end, w$rowtime], select=[a, Final_SUM(sum$0) AS sum_c],
reuse_id=[1])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c) AS sum$0])
+ +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$,
ts, 15000)], properties=[w$start, w$end, w$rowtime], select=[a, Partial_SUM(c)
AS sum$0])
+- Calc(select=[ts, a, CAST(c) AS c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, ts)]]], fields=[a, b, c, ts])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index a263d16..0074eb9 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -68,10 +68,10 @@ LogicalProject(b=[$0], s=[$2])
<![CDATA[
Calc(select=[b, f0 AS s])
+- Correlate(invocation=[explode($cor0.set)],
correlate=[table(explode($cor0.set))], select=[b,set,f0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)],
joinType=[INNER])
- +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow],
select=[b, Final_COLLECT(set) AS set])
+ +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$,
rowtime, 3000)], select=[b, Final_COLLECT(set) AS set])
+- Sort(orderBy=[b ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[b]])
- +- LocalSortWindowAggregate(groupBy=[b],
window=[TumblingGroupWindow], select=[b, Partial_COLLECT(b) AS set])
+ +- LocalSortWindowAggregate(groupBy=[b],
window=[TumblingGroupWindow('w$, rowtime, 3000)], select=[b, Partial_COLLECT(b)
AS set])
+- Sort(orderBy=[b ASC, rowtime ASC])
+- Calc(select=[b, rowtime], where=[<(b, 3)])
+- BoundedStreamScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, rowtime])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 7139daf..c8dc930 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -365,7 +365,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[b4],
window=[TumblingGroupWindow], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[b4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2,
COUNT(c4) AS EXPR$2])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source:
[TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -385,7 +385,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2,
AVG(b4) AS EXPR$3])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2,
COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog, default_database, T4, source:
[TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -411,7 +411,7 @@ Calc(select=[a4, c4, s, EXPR$3])
+- Exchange(distribution=[hash[a4, s]])
+- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s,
c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3),
$f4)), $f4)) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime],
select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4)
AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
@@ -438,7 +438,7 @@ Calc(select=[a4, c4, e, EXPR$3])
+- Exchange(distribution=[hash[a4, e]])
+- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e,
c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3),
$f4)), $f4)) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime],
select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4)
AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
@@ -464,7 +464,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4],
auxGrouping=[c4], select=[a4, b4
+- Exchange(distribution=[hash[a4, b4]])
+- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4,
c4, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4)
AS $f3, COUNT(b4) AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS $f2, SUM($f4)
AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
+- TableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index d6d70c3..b91d909 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -40,9 +40,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3],
EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0,
CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))))
AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1)))
AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1),
null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4,
w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1,
Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)],
properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0,
Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0,
Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts,
900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS
sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -72,7 +72,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3],
EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0,
CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))))
AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1)))
AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1),
null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4,
w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)],
properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1,
COUNT(b) AS $f2])
+- Exchange(distribution=[single])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -103,9 +103,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3],
EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0,
CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))))
AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1)))
AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1),
null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4,
w$end AS EXPR$5])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1,
Final_COUNT(count$2) AS $f2])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 900000)],
properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0,
Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0,
Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts,
900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS
sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
+- Calc(select=[ts, b, *(b, b) AS $f2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -329,9 +329,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0,
count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)],
select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)],
select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -351,7 +351,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[AVG(c) AS EXPR$0,
SUM(a) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[AVG(c)
AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -372,9 +372,9 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_AVG(sum$0,
count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)],
select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_SUM(sum$2) AS EXPR$1])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)],
select=[Partial_AVG(c) AS (sum$0, count$1), Partial_SUM(a) AS sum$2])
+- Calc(select=[b, c, a])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -438,9 +438,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS
sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)],
select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)],
select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -460,7 +460,7 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[SUM(a) AS sumA,
COUNT(b) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)],
select=[SUM(a) AS sumA, COUNT(b) AS cntB])
+- Exchange(distribution=[single])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -481,9 +481,9 @@ LogicalProject(sumA=[$1], cntB=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_SUM(sum$0) AS
sumA, Final_COUNT(count$1) AS cntB])
+HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)],
select=[Final_SUM(sum$0) AS sumA, Final_COUNT(count$1) AS cntB])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)],
select=[Partial_SUM(a) AS sum$0, Partial_COUNT(b) AS count$1])
+- Calc(select=[ts, a, b])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -601,9 +601,9 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)],
EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA,
minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA,
Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0)
AS sumA, Final_MIN(min$1) AS minB])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0,
Partial_MIN(b) AS min$1])
+ +- LocalHashWindowAggregate(groupBy=[c],
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end,
w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -633,7 +633,7 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)],
EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA,
minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA, MIN(b) AS
minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c, SUM(a) AS sumA,
MIN(b) AS minB])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -664,14 +664,121 @@ LogicalProject(EXPR$0=[TUMBLE_START($0)],
EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, w$rowtime AS EXPR$2, c, sumA,
minB])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0) AS sumA,
Final_MIN(min$1) AS minB])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c, Final_SUM(sum$0)
AS sumA, Final_MIN(min$1) AS minB])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c, Partial_SUM(a) AS sum$0,
Partial_MIN(b) AS min$1])
+ +- LocalHashWindowAggregate(groupBy=[c],
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end,
w$rowtime], select=[c, Partial_SUM(a) AS sum$0, Partial_MIN(b) AS min$1])
+- Calc(select=[ts, c, a, b])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
</Resource>
</TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)],
properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s,
Final_COUNT(count1$1) AS $f1])
+ +- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b,
900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS
sum$0, Partial_COUNT(*) AS count1$1])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)],
properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS
$f1])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, b
+ FROM MyTable
+)
+GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)],
properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s,
Final_COUNT(count1$1) AS $f1])
+ +- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, b,
900000)], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS
sum$0, Partial_COUNT(*) AS count1$1])
+ +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSlidingWindowHashAgg[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[SELECT count(c) FROM MyTable1 GROUP BY b, HOP(ts, INTERVAL '3'
SECOND, INTERVAL '1' HOUR)]]>
@@ -1012,9 +1119,9 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow],
select=[a, Partial_COUNT(c) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS
count$0])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -1035,7 +1142,7 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
COUNT(c) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, COUNT(c) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
@@ -1057,9 +1164,9 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
Final_COUNT(count$0) AS EXPR$0])
++- HashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
- +- LocalHashWindowAggregate(groupBy=[a], window=[TumblingGroupWindow],
select=[a, Partial_COUNT(c) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_COUNT(c) AS
count$0])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
]]>
@@ -1080,9 +1187,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS
EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2)
AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
- +- LocalHashWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1),
Partial_COUNT(a) AS count$2])
+ +- LocalHashWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS
(sum$0, count$1), Partial_COUNT(a) AS count$2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
@@ -1102,7 +1209,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, AVG(c) AS EXPR$0, COUNT(a) AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -1123,9 +1230,9 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2) AS
EXPR$1])
++- HashWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_COUNT(count$2)
AS EXPR$1])
+- Exchange(distribution=[hash[a, d]])
- +- LocalHashWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1),
Partial_COUNT(a) AS count$2])
+ +- LocalHashWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS
(sum$0, count$1), Partial_COUNT(a) AS count$2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
@@ -1144,10 +1251,10 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow],
select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)],
select=[Final_weightedAvg(wAvg) AS wAvg])
+- Sort(orderBy=[assignedWindow$ ASC])
+- Exchange(distribution=[single])
- +- LocalSortWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_weightedAvg(b, a) AS wAvg])
+ +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts,
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Calc(select=[ts, b, a])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1169,10 +1276,10 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
+- Sort(orderBy=[a ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a]])
- +- LocalSortWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+ +- LocalSortWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS
max$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]],
fields=[ts, a, b, c])
@@ -1194,7 +1301,7 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
MAX(c) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, MAX(c) AS EXPR$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, ts, c])
@@ -1217,10 +1324,10 @@ LogicalProject(EXPR$0=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow], select=[a,
Final_MAX(max$0) AS EXPR$0])
++- SortWindowAggregate(groupBy=[a], window=[TumblingGroupWindow('w$, ts,
3000)], select=[a, Final_MAX(max$0) AS EXPR$0])
+- Sort(orderBy=[a ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a]])
- +- LocalSortWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow], select=[a, Partial_MAX(c) AS max$0])
+ +- LocalSortWindowAggregate(groupBy=[a],
window=[TumblingGroupWindow('w$, ts, 3000)], select=[a, Partial_MAX(c) AS
max$0])
+- Sort(orderBy=[a ASC, ts ASC])
+- Calc(select=[a, ts, c])
+- TableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]],
fields=[ts, a, b, c])
@@ -1242,7 +1349,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1264,7 +1371,7 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, AVG(c) AS EXPR$0, countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- Exchange(distribution=[hash[a, d]])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
@@ -1286,10 +1393,10 @@ LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1])
-+- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow],
select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0, Final_countFun(EXPR$1) AS
EXPR$1])
++- SortWindowAggregate(groupBy=[a, d], window=[TumblingGroupWindow('w$, b,
3000)], select=[a, d, Final_AVG(sum$0, count$1) AS EXPR$0,
Final_countFun(EXPR$1) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[a, d]])
- +- LocalSortWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow], select=[a, d, Partial_AVG(c) AS (sum$0, count$1),
Partial_countFun(a) AS EXPR$1])
+ +- LocalSortWindowAggregate(groupBy=[a, d],
window=[TumblingGroupWindow('w$, b, 3000)], select=[a, d, Partial_AVG(c) AS
(sum$0, count$1), Partial_countFun(a) AS EXPR$1])
+- Sort(orderBy=[a ASC, d ASC, b ASC])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
@@ -1310,9 +1417,9 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c])
+ +- LocalHashWindowAggregate(groupBy=[c],
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end,
w$rowtime], select=[c])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
@@ -1332,7 +1439,7 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow], select=[weightedAvg(b, a) AS
wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)],
select=[weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Exchange(distribution=[single])
+- Calc(select=[ts, b, a])
@@ -1355,7 +1462,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1376,10 +1483,10 @@ LogicalProject(wAvg=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow],
select=[Final_weightedAvg(wAvg) AS wAvg])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)],
select=[Final_weightedAvg(wAvg) AS wAvg])
+- Sort(orderBy=[assignedWindow$ ASC])
+- Exchange(distribution=[single])
- +- LocalSortWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_weightedAvg(b, a) AS wAvg])
+ +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts,
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+- Sort(orderBy=[ts ASC])
+- Calc(select=[ts, b, a])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
@@ -1401,119 +1508,12 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts,
240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+- Exchange(distribution=[hash[c]])
- +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[c])
+ +- LocalHashWindowAggregate(groupBy=[c],
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end,
w$rowtime], select=[c])
+- Calc(select=[ts, c])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
]]>
</Resource>
</TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg[aggStrategy=AUTO]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS
$f1])
- +- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0,
Partial_COUNT(*) AS count1$1])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
- <TestCase
name="testReturnTypeInferenceForWindowAgg[aggStrategy=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
- +- Exchange(distribution=[single])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
- <TestCase
name="testReturnTypeInferenceForWindowAgg[aggStrategy=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(b, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, b
- FROM MyTable
-)
-GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS
$f1])
- +- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0,
Partial_COUNT(*) AS count1$1])
- +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
index e14de92..1b8adc1 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/GroupWindowTest.xml
@@ -20,15 +20,15 @@ limitations under the License.
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0)
AS EXPR$0])
+HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
select=[Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
select=[Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
]]>
</Resource>
@@ -54,15 +54,15 @@ HashWindowAggregate(groupBy=[string],
window=[SlidingGroupWindow('w, long, 8, 10
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, Final_COUNT(count$0) AS EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long,
5)], select=[string, Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, Partial_COUNT(int) AS count$0])
+ +- LocalHashWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow('w, long, 5)], select=[string, Partial_COUNT(int)
AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
]]>
</Resource>
@@ -71,15 +71,15 @@ HashWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[stri
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2,
EXPR$3])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(ts, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS
EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts,
7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string,
Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS
count$0])
+ +- LocalHashWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2,
EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
]]>
</Resource>
@@ -88,16 +88,16 @@ HashWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], properties=[
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)],
window=[TumblingGroupWindow('w, long, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
+SortWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, long,
5)], select=[string, Final_myWeightedAvg(EXPR$0) AS EXPR$0])
+- Sort(orderBy=[string ASC, assignedWindow$ ASC])
+- Exchange(distribution=[hash[string]])
- +- LocalSortWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[string, Partial_myWeightedAvg(long, int)
AS EXPR$0])
+ +- LocalSortWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow('w, long, 5)], select=[string,
Partial_myWeightedAvg(long, int) AS EXPR$0])
+- Sort(orderBy=[string ASC, long ASC])
+- TableSourceScan(table=[[default_catalog, default_database,
Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int,
string])
]]>
@@ -107,15 +107,15 @@ SortWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[stri
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2, EXPR$3])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2,
EXPR$3])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(ts, int, string)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Final_COUNT(count$0) AS
EXPR$0])
+HashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w, ts,
7200000)], properties=[EXPR$1, EXPR$2, EXPR$3], select=[string,
Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
- +- LocalHashWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2, EXPR$3], select=[string, Partial_COUNT(int) AS
count$0])
+ +- LocalHashWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow('w, ts, 7200000)], properties=[EXPR$1, EXPR$2,
EXPR$3], select=[string, Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(ts, int, string)]]], fields=[ts, int, string])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index d98cbd7..b017098 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -330,7 +330,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)],
EXPR$2=[COUNT($2)], window=[TumblingGroupWindow], properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)],
EXPR$2=[COUNT($2)], window=[TumblingGroupWindow('w$, d4, 900000)],
properties=[])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
</Resource>
@@ -349,7 +349,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)],
EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow],
properties=[])
+FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)],
EXPR$2=[COUNT($1)], EXPR$3=[AVG($1)], window=[TumblingGroupWindow('w$, d4,
900000)], properties=[])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
</Resource>
@@ -372,7 +372,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
FlinkLogicalCalc(select=[a4, c4, s, EXPR$3])
+- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)],
EXPR$3=[COUNT($3)])
+- FlinkLogicalCalc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3,
$f3), $f4)), $f4)) AS b4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
]]>
@@ -396,7 +396,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
FlinkLogicalCalc(select=[a4, c4, e, EXPR$3])
+- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)],
EXPR$3=[COUNT($3)])
+- FlinkLogicalCalc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3,
$f3), $f4)), $f4)) AS b4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
]]>
@@ -419,7 +419,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
<![CDATA[
FlinkLogicalAggregate(group=[{0, 1}], c4=[AUXILIARY_GROUP($2)],
EXPR$3=[COUNT()])
+- FlinkLogicalCalc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS
b4, c4])
- +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow], properties=[])
+ +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[])
+- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
+- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 1dcbe67..b82a3d5 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -71,7 +71,7 @@ LogicalProject(b=[$0], EXPR$1=[$2],
EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
<Resource name="planAfter">
<![CDATA[
Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS
EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime,
5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a)
AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1],
where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)),
<=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0,
rowtime0])
@@ -140,7 +140,7 @@ Calc(select=[b, w0$o0 AS $1])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=2],
where=[AND(=(a, a0), >=(rt, -(rt0, 5000:INTERVAL SECOND)), <=(rt, +(rt0,
10000:INTERVAL SECOND)))], select=[b, a, rt, b0, a0, rt0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[b, a, w$rowtime AS rt])
- : +- GroupWindowAggregate(groupBy=[b],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime,
w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ : +- GroupWindowAggregate(groupBy=[b],
window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start,
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[hash[b]])
: +- Calc(select=[b, rowtime, a])
: +- WatermarkAssigner(fields=[a, b, c, proctime,
rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
@@ -269,7 +269,7 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods],
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[id2, rowtime, cnt, name, goods])
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text,
$f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS
w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1,
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
@@ -281,7 +281,7 @@ Sink(name=[appendSink1], fields=[a, b])
+- Reused(reference_id=[2])
Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
+- Reused(reference_id=[1])
@@ -329,7 +329,7 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy :
FORWARD
: Operator
- content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text,
$f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS
w$rowtime, proctime('w$) AS w$proctime])
+ content
: GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1,
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
ship_strategy : HASH
:
Operator
@@ -349,7 +349,7 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
-
content : GroupWindowAggregate(groupBy=[id1],
window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+
content : GroupWindowAggregate(groupBy=[id1],
window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS
EXPR$1])
ship_strategy : HASH
: Operator
@@ -520,10 +520,10 @@ LogicalProject(b=[$0], EXPR$1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b,
$SUM0(cnt) AS EXPR$1])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1,
5000)], select=[b, $SUM0(cnt) AS EXPR$1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, w$rowtime AS $f1, cnt])
- +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$,
rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
+- WatermarkAssigner(fields=[a, b, c, proctime, rowtime],
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 381c804..62f238d 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -148,10 +148,10 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1],
EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, $f0,
30000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[long,
SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS
w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[w$rowtime AS $f0, long, int])
- +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
int, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[long],
window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[long, SUM(int) AS int, start('w$) AS w$start,
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable1]], fields=[rowtime, long, int])
]]>
@@ -236,7 +236,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, long])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
select=[long, MIN(rowtime0) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable1]], fields=[rowtime, long, int])
@@ -286,7 +286,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1],
EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS EXPR$0, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]],
fields=[rowtime, long, int])
]]>
@@ -312,7 +312,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), w$end), 1:BIGINT)])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0)
AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[long, MIN(rowtime0) AS EXPR$0, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable1]], fields=[rowtime, long, int])
@@ -341,7 +341,7 @@ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0],
EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$end AS rowtime, long, EXPR$2])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 100)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[long, SUM(int) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]],
fields=[rowtime, long, int])
]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index df5adca..3d4f11f 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -125,7 +125,7 @@ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)],
EXPR$2=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[name, w$end AS EXPR$1, EXPR$2])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$,
rowtime, 600000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[name]])
+- Calc(select=[name, rowtime, val], where=[>(val, 100)])
+- TableSourceScan(table=[[default_catalog, default_database,
rowTimeT]], fields=[id, rowtime, val, name])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index b42e44f..09ef6c8 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -68,7 +68,7 @@ LogicalProject(b=[$0], s=[$2])
<![CDATA[
Calc(select=[b, s])
+- Correlate(invocation=[explode($cor0.set)],
correlate=[table(explode($cor0.set))], select=[b,set,f0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)],
joinType=[INNER])
- +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow],
select=[b, COLLECT(b) AS set])
+ +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$,
rowtime, 3000)], select=[b, COLLECT(b) AS set])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime], where=[<(b, 3)])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, rowtime])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index c88f676..45f4725 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -41,7 +41,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3],
EXPR$3=[$4], EXPR$4=[TUMBL
<Resource name="planAfter">
<![CDATA[
Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1,
$f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1,
CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2,
CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2,
1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c)
AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0,
SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, *(c, c) AS $f2])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -68,7 +68,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0),
60000:INTERVAL MINUTE)])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, +(w$end, 60000:INTERVAL MINUTE) AS EXPR$1])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -160,7 +160,7 @@ LogicalProject(EXPR$0=[$1])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b]])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow],
select=[a, b, c])
+ +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -297,10 +297,10 @@ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)],
EXPR$1=[TUMBLE_END($0)], a=[$1])
<Resource name="planAfter">
<![CDATA[
Calc(select=[w$rowtime AS EXPR$0, w$end AS EXPR$1, a])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start,
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f0, 4)],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[w$rowtime AS $f0, a])
- +- GroupWindowAggregate(window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime,
2)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(a) AS a,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, a])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -333,13 +333,48 @@ LogicalProject(EXPR$0=[$2])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, d], select=[b, d, weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, d]])
- +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow],
select=[a, b, c, COUNT(*) AS d])
+ +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c, COUNT(*)
AS d])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
+ <TestCase name="testReturnTypeInferenceForWindowAgg">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ SUM(correct) AS s,
+ AVG(correct) AS a,
+ TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
+FROM (
+ SELECT CASE a
+ WHEN 1 THEN 1
+ ELSE 99
+ END AS correct, rowtime
+ FROM MyTable
+)
+GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+ +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s,
COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS
w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSessionFunction">
<Resource name="sql">
<![CDATA[
@@ -396,7 +431,7 @@ Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start,
weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, d, ping_start]])
+- Calc(select=[b, d, w$start AS ping_start, c, a])
- +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime,
w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start,
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -431,7 +466,7 @@ Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, ping_start], select=[b, ping_start,
weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, ping_start]])
+- Calc(select=[b, w$start AS ping_start, c, a])
- +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime,
w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -460,46 +495,11 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2],
EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS
wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0,
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, a])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testReturnTypeInferenceForWindowAgg">
- <Resource name="sql">
- <![CDATA[
-SELECT
- SUM(correct) AS s,
- AVG(correct) AS a,
- TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
-FROM (
- SELECT CASE a
- WHEN 1 THEN 1
- ELSE 99
- END AS correct, rowtime
- FROM MyTable
-)
-GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
- +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)],
correct=[CASE(=($0, 1), 1, 99)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s,
CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1,
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
- +- Exchange(distribution=[single])
- +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
- +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 5053e8d..ba5ea52 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -217,7 +217,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b,
SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime,
21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[rowtime, b, a0, b0])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0),
600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL
HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
@@ -403,7 +403,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow], select=[b0,
SUM(a) AS aSum, COUNT(b) AS bCnt])
+GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0,
21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[rowtime0, b0, a, b])
+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[AND(=(a, a0), >=(CAST(rowtime), -(CAST(rowtime0),
600000:INTERVAL MINUTE)), <=(CAST(rowtime), +(CAST(rowtime0), 3600000:INTERVAL
HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index d5b8a85..95b1683 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -53,13 +53,13 @@ GroupAggregate(groupBy=[b], select=[b, COUNT(a) AS TMP_0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)],
EXPR$1=[SUM($0)], window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)],
EXPR$1=[SUM($0)], window=[TumblingGroupWindow('w, rowtime, 900000)],
properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(DISTINCT a)
AS EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)],
select=[COUNT(DISTINCT a) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, rowtime])
]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
index 93d56b8..2a44ce2 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml
@@ -151,7 +151,7 @@ Calc(select=[a AS a2, b AS b2])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
-+- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{1}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
$f5=[UPPER($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
@@ -159,7 +159,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], b=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[EXPR$0, EXPR$1, b])
-+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b,
COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w, rowtime,
5)], select=[b, COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
@@ -170,14 +170,14 @@ Calc(select=[EXPR$0, EXPR$1, b])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($5)], EXPR$1=[SUM($0)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
$f5=[UPPER($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT($f5) AS
EXPR$0, SUM(a) AS EXPR$1])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)],
select=[COUNT($f5) AS EXPR$0, SUM(a) AS EXPR$1])
+- Exchange(distribution=[single])
+- Calc(select=[a, b, c, d, rowtime, UPPER(c) AS $f5])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, rowtime])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
index ae8ceea..165c523 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
@@ -68,13 +68,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w,
rowtime, 8, 10)], select=[CO
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS
EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)],
select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[rowtime, int, string])
]]>
@@ -100,13 +100,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w,
proctime, 2, 1)], select=[CO
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS
EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 5)],
select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, rowtime])
]]>
@@ -132,13 +132,13 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w,
proctime, 50, 50)], select=[
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow], select=[COUNT(int) AS
EXPR$0])
+GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 2)],
select=[COUNT(int) AS EXPR$0])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, proctime])
]]>
@@ -148,13 +148,13 @@ GroupWindowAggregate(window=[TumblingGroupWindow],
select=[COUNT(int) AS EXPR$0]
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, proctime])
]]>
@@ -164,14 +164,14 @@ GroupWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3],
EXPR$4=[$4], EXPR$5=[$5])
-+- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)],
EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)],
window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[VAR_POP($3)],
EXPR$1=[VAR_SAMP($3)], EXPR$2=[STDDEV_POP($3)], EXPR$3=[STDDEV_SAMP($3)],
window=[TumblingGroupWindow('w, rowtime, 900000)], properties=[EXPR$4, EXPR$5])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1,
$f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1,
CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2,
CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2,
1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, EXPR$4, EXPR$5])
-+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[EXPR$4,
EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS
EXPR$4, end('w) AS EXPR$5])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 900000)],
properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c)
AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, a, b, c, *(c, c) AS $f4])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[rowtime, a, b, c])
@@ -278,13 +278,13 @@ GroupWindowAggregate(groupBy=[string],
window=[SlidingGroupWindow('w, rowtime, 8
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
rowtime, 5)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[rowtime, int, string])
]]>
@@ -294,13 +294,13 @@ GroupWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[myWeightedAvg($0, $1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, myWeightedAvg(long, int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
rowtime, 5)], select=[string, myWeightedAvg(long, int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, rowtime])
]]>
@@ -328,7 +328,7 @@ GroupWindowAggregate(groupBy=[string],
window=[SlidingGroupWindow('w, rowtime, 1
LogicalProject(EXPR$0=[$0])
+- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
window=[SlidingGroupWindow('w2, proctime, 20, 10)], properties=[])
+- LogicalProject(proctime=[AS($2, _UTF-16LE'proctime')], string=[$0],
EXPR$1=[$1])
- +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)],
window=[TumblingGroupWindow], properties=[EXPR$0])
+ +- LogicalWindowAggregate(group=[{2}], EXPR$1=[COUNT($1)],
window=[TumblingGroupWindow('w1, proctime, 50)], properties=[EXPR$0])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
@@ -336,7 +336,7 @@ LogicalProject(EXPR$0=[$0])
<![CDATA[
GroupWindowAggregate(window=[SlidingGroupWindow('w2, proctime, 20, 10)],
select=[COUNT(string) AS EXPR$0])
+- Exchange(distribution=[single])
- +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1, proctime('w1) AS
EXPR$0])
+ +- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w1,
proctime, 50)], properties=[EXPR$0], select=[string, COUNT(int) AS EXPR$1,
proctime('w1) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, proctime])
]]>
@@ -362,13 +362,13 @@ GroupWindowAggregate(groupBy=[string],
window=[SlidingGroupWindow('w, proctime,
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, proctime, 2)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
proctime, 2)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, proctime])
]]>
@@ -378,13 +378,13 @@ GroupWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], select=[str
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, proctime, 50)], properties=[])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
select=[string, COUNT(int) AS EXPR$0])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
proctime, 50)], select=[string, COUNT(int) AS EXPR$0])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, proctime])
]]>
@@ -428,13 +428,13 @@ Calc(select=[EXPR$0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[COUNT($1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS EXPR$0, start('w) AS
EXPR$1, end('w) AS EXPR$2])
+GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, COUNT(int) AS
EXPR$0, start('w) AS EXPR$1, end('w) AS EXPR$2])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[long, int, string, rowtime])
]]>
@@ -444,14 +444,14 @@ GroupWindowAggregate(groupBy=[string],
window=[TumblingGroupWindow], properties=
<Resource name="planBefore">
<![CDATA[
LogicalProject(string=[$0], s1=[AS(+($1, 1), _UTF-16LE's1')], s2=[AS(+($1, 3),
_UTF-16LE's2')], x=[AS($2, _UTF-16LE'x')], x2=[AS($2, _UTF-16LE'x2')],
x3=[AS($3, _UTF-16LE'x3')], EXPR$2=[$3])
-+- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)],
window=[TumblingGroupWindow], properties=[EXPR$1, EXPR$2])
++- LogicalWindowAggregate(group=[{2}], EXPR$0=[SUM($1)],
window=[TumblingGroupWindow('w, rowtime, 5)], properties=[EXPR$1, EXPR$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[string, +(EXPR$0, 1) AS s1, +(EXPR$0, 3) AS s2, EXPR$1 AS x,
EXPR$1 AS x2, EXPR$2 AS x3, EXPR$2])
-+- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow],
properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0, start('w) AS
EXPR$1, end('w) AS EXPR$2])
++- GroupWindowAggregate(groupBy=[string], window=[TumblingGroupWindow('w,
rowtime, 5)], properties=[EXPR$1, EXPR$2], select=[string, SUM(int) AS EXPR$0,
start('w) AS EXPR$1, end('w) AS EXPR$2])
+- Exchange(distribution=[hash[string]])
+- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[rowtime, int, string])
]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index fcdf107..e74e59b 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -107,7 +107,7 @@ Calc(select=[name, val, id])
<Resource name="planBefore">
<![CDATA[
LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
-+- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)],
window=[TumblingGroupWindow], properties=[EXPR$0])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)],
window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+- LogicalFilter(condition=[>($2, 100)])
+- LogicalTableScan(table=[[default_catalog, default_database,
rowTimeT]])
]]>
@@ -115,7 +115,7 @@ LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
<Resource name="planAfter">
<![CDATA[
Calc(select=[name, EXPR$0, EXPR$1])
-+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow],
properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w,
rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1,
end('w) AS EXPR$0])
+- Exchange(distribution=[hash[name]])
+- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+- TableSourceScan(table=[[default_catalog, default_database,
rowTimeT]], fields=[id, rowtime, val, name])