[
https://issues.apache.org/jira/browse/FLINK-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-13563:
-------------------------------
Description:
{code:scala}
@Test
def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
val table = util.addDataStream[(Long, Int, String)](
"T1", 'long, 'int, 'string, 'rowtime.rowtime)
val windowedTable = table
.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
.select('int.count)
util.verifyPlan(windowedTable)
}
{code}
currently, it's physical plan is
{code:java}
HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0)
AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
{code}
we know nothing about the TumblingGroupWindow except its name. the expected
plan is
{code:java}
HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
select=[Final_COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
select=[Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
{code}
was:
{code:scala}
@Test
def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
val table = util.addDataStream[(Long, Int, String)](
"T1", 'long, 'int, 'string, 'rowtime.rowtime)
val windowedTable = table
.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
.select('int.count)
util.verifyPlan(windowedTable)
}
{code}
currently, it's physical plan is
{code:java}
HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0)
AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashWindowAggregate(window=[TumblingGroupWindow],
select=[Partial_COUNT(int) AS count$0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1,
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
{code}
we know nothing about the TumblingGroupWindow except its name
> TumblingGroupWindow should implement toString method
> ----------------------------------------------------
>
> Key: FLINK-13563
> URL: https://issues.apache.org/jira/browse/FLINK-13563
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0, 1.10.0
> Reporter: godfrey he
> Priority: Major
> Fix For: 1.9.0, 1.10.0
>
>
> {code:scala}
> @Test
> def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
> val util = streamTestUtil()
> val table = util.addDataStream[(Long, Int, String)](
> "T1", 'long, 'int, 'string, 'rowtime.rowtime)
> val windowedTable = table
> .window(Tumble over 5.millis on 'rowtime as 'w)
> .groupBy('w)
> .select('int.count)
> util.verifyPlan(windowedTable)
> }
> {code}
> currently, it's physical plan is
> {code:java}
> HashWindowAggregate(window=[TumblingGroupWindow],
> select=[Final_COUNT(count$0) AS EXPR$0])
> +- Exchange(distribution=[single])
> +- LocalHashWindowAggregate(window=[TumblingGroupWindow],
> select=[Partial_COUNT(int) AS count$0])
> +- TableSourceScan(table=[[default_catalog, default_database, Table1,
> source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
> {code}
> we know nothing about the TumblingGroupWindow except its name. the expected
> plan is
> {code:java}
> HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
> select=[Final_COUNT(count$0) AS EXPR$0])
> +- Exchange(distribution=[single])
> +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)],
> select=[Partial_COUNT(int) AS count$0])
> +- TableSourceScan(table=[[default_catalog, default_database, Table1,
> source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)