[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacky Lau updated FLINK-34702:
------------------------------
    Description: 
{code:java}
@Test
def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
  val sqlQuery =
    """
      |SELECT *
      |FROM (
      |  SELECT *,
      |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
      |  FROM (select a, count(b) as b from MyTable group by a)
      |)
      |WHERE rowNum = 1
    """.stripMargin

  util.verifyExecPlan(sqlQuery)
} {code}
Exception:

org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
support consuming update changes which is produced by node 
GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])

because the StreamPhysicalDeduplicate can not consuming update changes now 
while StreamExecRank can.

so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
this case. and we can defer whether input contains update change in the 
"optimize the physical plan" phase. 

so we can add an option to solve it. and when the StreamPhysicalDeduplicate can 
support consuming update changes , we can deprecate it

  was:
{code:java}
@Test
def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
  val sqlQuery =
    """
      |SELECT *
      |FROM (
      |  SELECT *,
      |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
      |  FROM (select a, count(b) as b from MyTable group by a)
      |)
      |WHERE rowNum = 1
    """.stripMargin

  util.verifyExecPlan(sqlQuery)
} {code}


> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-34702
>                 URL: https://issues.apache.org/jira/browse/FLINK-34702
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Jacky Lau
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
>     """
>       |SELECT *
>       |FROM (
>       |  SELECT *,
>       |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>       |  FROM (select a, count(b) as b from MyTable group by a)
>       |)
>       |WHERE rowNum = 1
>     """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to