[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883560#comment-17883560
 ] 

Sergey Nuyanzin edited comment on FLINK-34702 at 9/22/24 12:09 AM:
-------------------------------------------------------------------

I had a look here a bit.

I noticed that there is a number of similar cases where this
{quote}StreamPhysicalDeduplicate doesn't support consuming update changes which 
is produced by node{quote} is happening and node could be also a join node like 
left/right/full outer join, semijoin, antijoin. Look at the end to see examples.

Since there are some difficulties mentioned above with also 3 aforementioned 
solutions, how about going from a bit different direction?
What if we in {{StreamPhysicalRankRule#convert}} do not convert to deduplicate 
{{RelNode}}'s if they in input contains {{GroupAggregate}}'s, 
{{LeftOuterJoin}}, {{RightOuterJoin}}, {{FullOuterJoin}}, {{SemiJoin}}, 
{{AntiJoin}} ?

I submitted a draft PR with this proposal (first to make sure that it passes 
existing ci tests)  

More examples of failing queries

FullOuterJoin (Left/RightOuterJoin are similar)
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT m.a, m.b FROM MyTable m FULL OUTER JOIN MyTable2 m2 ON m.a = 
m2.a)
)
WHERE rowNum = 1
{code}
SemiJoin
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT a, b FROM MyTable
        WHERE EXISTS (
        SELECT a, b FROM MyTable2))
)
WHERE rowNum = 1
{code}
AntiJoin
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT a, b FROM MyTable
        WHERE NOT EXISTS (
        SELECT a, b FROM MyTable2))
)
WHERE rowNum = 1
{code}


was (Author: sergey nuyanzin):
I had a look here a bit.

I noticed that there is a number of similar cases where this
{quote}StreamPhysicalDeduplicate doesn't support consuming update changes which 
is produced by node{quote} is happening and node could be also a join node like 
left/right/full outer join, semijoin, antijoin. Look at the end to see examples.

Since there are some difficulties mentioned above with also 3 aforementioned 
solutions, how about going from a bit different direction?
What if we in {{StreamPhysicalRankRule#convert}} do not convert to deduplicate 
{{RelNode}}'s if they in input contains {{GroupAggregate}}'s, 
{{LeftOuterJoin}}, {{RightOuterJoin}}, {{FullOuterJoin}}, {{SemiJoin}}, 
{{AntiJoin}} ?

I submitted a draft PR with this proposal (first to make sure that it passes 
existing ci tests)  

More examples of failing queries
FullOuterJoin (Left/RightOuterJoin are similar)
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT m.a, m.b FROM MyTable m FULL OUTER JOIN MyTable2 m2 ON m.a = 
m2.a)
)
WHERE rowNum = 1
{code}
SemiJoin
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT a, b FROM MyTable
        WHERE EXISTS (
        SELECT a, b FROM MyTable2))
)
WHERE rowNum = 1
{code}
AntiJoin
{code:sql}
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM (SELECT a, b FROM MyTable
        WHERE NOT EXISTS (
        SELECT a, b FROM MyTable2))
)
WHERE rowNum = 1
{code}

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-34702
>                 URL: https://issues.apache.org/jira/browse/FLINK-34702
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Jacky Lau
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
>     """
>       |SELECT *
>       |FROM (
>       |  SELECT *,
>       |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>       |  FROM (select a, count(b) as b from MyTable group by a)
>       |)
>       |WHERE rowNum = 1
>     """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to