[
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jacky Lau updated FLINK-34702:
------------------------------
Description:
{code:java}
@Test
def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
val sqlQuery =
"""
|SELECT *
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM (select a, count(b) as b from MyTable group by a)
|)
|WHERE rowNum = 1
""".stripMargin
util.verifyExecPlan(sqlQuery)
} {code}
Exception:
org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't
support consuming update changes which is produced by node
GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
because the StreamPhysicalDeduplicate can not consuming update changes now
while StreamExecRank can.
so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in
this case. and we can defer whether input contains update change in the
"optimize the physical plan" phase.
so we can add an option to solve it. and when the StreamPhysicalDeduplicate can
support consuming update changes , we can deprecate it
was:
{code:java}
@Test
def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
val sqlQuery =
"""
|SELECT *
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM (select a, count(b) as b from MyTable group by a)
|)
|WHERE rowNum = 1
""".stripMargin
util.verifyExecPlan(sqlQuery)
} {code}
> Rank should not convert to StreamExecDuplicate when the input is not insert
> only
> --------------------------------------------------------------------------------
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Jacky Lau
> Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
> val sqlQuery =
> """
> |SELECT *
> |FROM (
> | SELECT *,
> | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as
> rowNum
> | FROM (select a, count(b) as b from MyTable group by a)
> |)
> |WHERE rowNum = 1
> """.stripMargin
> util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't
> support consuming update changes which is produced by node
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in
> this case. and we can defer whether input contains update change in the
> "optimize the physical plan" phase.
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate
> can support consuming update changes , we can deprecate it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)