[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822415#comment-17822415
 ] 

yisha zhou commented on FLINK-34529:
------------------------------------

hi [~lincoln.86xy] , thanks for your advice.

After discussion with [~libenchao] , I agreed that putting these kind of rules 
to cost-based planner seems to be in line with future trend.

Meanwhile I found that most of ProjectXXTransposeRules are in 
`FlinkStreamRuleSets#PROJECT_RULES`and `PROJECT_RULES` seems to be used both in 
'LOGICAL' (volcano)and 'PROJECT_REWRITE'(hep).  I prepare to add 
`CoreRules.PROJECT_WINDOW_TRANSPOSE` to `FlinkStreamRuleSets#PROJECT_RULES`too, 
so that both kind of planner can utilize the rule.  WDT? 

> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
>                 Key: FLINK-34529
>                 URL: https://issues.apache.org/jira/browse/FLINK-34529
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: yisha zhou
>            Assignee: yisha zhou
>            Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
>     |SELECT a FROM (
>     |  SELECT a, f,
>     |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
>     |  FROM  T1, T2
>     |  WHERE T1.a = T2.d
>     |)
>     |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- Calc(select=[a, c])
>             :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- Calc(select=[d, f])
>                   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to