[
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Terry Wang updated FLINK-17553:
-------------------------------
Description:
Exception stack is as following:
!temp.png!
We can reproduce this problem by add following test in
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
{code:scala}
// Some comments here
@Test
def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
"""
|CREATE TABLE src (
| log_ts STRING,
| ts TIMESTAMP(3),
| a INT,
| b DOUBLE,
| rowtime AS CAST(log_ts AS TIMESTAMP(3)),
| WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
| 'connector' = 'COLLECTION',
| 'is-bounded' = 'false'
|)
""".stripMargin
val ddl2 =
"""
|CREATE TABLE dst (
| ts TIMESTAMP(3),
| a BIGINT,
| b DOUBLE
|) WITH (
| 'connector.type' = 'filesystem',
| 'connector.path' = '/tmp/1',
| 'format.type' = 'csv'
|)
""".stripMargin
val query =
"""
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
""".stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
}
{code}
I spent lots of work digging into this bug, and found the problem may be caused
by AggregateProjectPullUpConstantsRule which doesn't generate proper project
items correctly.
After I remove AggregateProjectPullUpConstantsRule from
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
The problem is WindowPropertiesRule can not match RelNodeTree after the
transformation of AggregateProjectPullUpConstantsRule, we also can add
ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after
AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.
was:
Exception stack is as following:
!temp.png!
We can reproduce this problem by add following test in
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
{code:scala}
// Some comments here
@Test
def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
"""
|CREATE TABLE src (
| log_ts STRING,
| ts TIMESTAMP(3),
| a INT,
| b DOUBLE,
| rowtime AS CAST(log_ts AS TIMESTAMP(3)),
| WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
| 'connector' = 'COLLECTION',
| 'is-bounded' = 'false'
|)
""".stripMargin
val ddl2 =
"""
|CREATE TABLE dst (
| ts TIMESTAMP(3),
| a BIGINT,
| b DOUBLE
|) WITH (
| 'connector.type' = 'filesystem',
| 'connector.path' = '/tmp/1',
| 'format.type' = 'csv'
|)
""".stripMargin
val query =
"""
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
""".stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
}
{code}
I spent lots of work digging the bug, and found the problem may be caused by
AggregateProjectPullUpConstantsRule which doesn't generate proper project items
correctly.
After I remove AggregateProjectPullUpConstantsRule from
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
> Constant exists in group window key leads to error: Unsupported call:
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Terry Wang
> Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
> !temp.png!
> We can reproduce this problem by add following test in
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
> @Test
> def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
> """
> |CREATE TABLE src (
> | log_ts STRING,
> | ts TIMESTAMP(3),
> | a INT,
> | b DOUBLE,
> | rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> | 'connector' = 'COLLECTION',
> | 'is-bounded' = 'false'
> |)
> """.stripMargin
> val ddl2 =
> """
> |CREATE TABLE dst (
> | ts TIMESTAMP(3),
> | a BIGINT,
> | b DOUBLE
> |) WITH (
> | 'connector.type' = 'filesystem',
> | 'connector.path' = '/tmp/1',
> | 'format.type' = 'csv'
> |)
> """.stripMargin
> val query =
> """
> |INSERT INTO dst
> |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts),
> SUM(b)
> |FROM src
> | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
> |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
> """.stripMargin
> tEnv.sqlUpdate(ddl1)
> tEnv.sqlUpdate(ddl2)
> tEnv.sqlUpdate(query)
> println(tEnv.explain(true))
> }
> {code}
> I spent lots of work digging into this bug, and found the problem may be
> caused by AggregateProjectPullUpConstantsRule which doesn't generate proper
> project items correctly.
> After I remove AggregateProjectPullUpConstantsRule from
> FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
> The problem is WindowPropertiesRule can not match RelNodeTree after the
> transformation of AggregateProjectPullUpConstantsRule, we also can add
> ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after
> AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)