[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823920#comment-17823920
 ] 

Benchao Li commented on FLINK-34529:
------------------------------------

+1 for adding it to "PROJECT_RULES" so that we can utilize it in both cbo and 
rbo stages.

bq. For the question 'I'm even wondering that if we really needs 
CalcRankTransposeRule', I've tried to remove it, and found that 
`ProjectWindowTransposeRule` can completely cover the functionality of 
`CalcRankTransposeRule`(from the results of tests introduced along with this 
rule) and even do  much better job in some cases. Therefore, I prepare to 
remove this rule in the this PR, WDT? 

[~nilerzhou] I'm ok with it

> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
>                 Key: FLINK-34529
>                 URL: https://issues.apache.org/jira/browse/FLINK-34529
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: yisha zhou
>            Assignee: yisha zhou
>            Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
>     |SELECT a FROM (
>     |  SELECT a, f,
>     |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
>     |  FROM  T1, T2
>     |  WHERE T1.a = T2.d
>     |)
>     |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- Calc(select=[a, c])
>             :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- Calc(select=[d, f])
>                   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to