[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822118#comment-17822118
 ] 

Benchao Li edited comment on FLINK-34529 at 2/29/24 1:08 PM:
-------------------------------------------------------------

[~xuyangzhong] [~nilerzhou] Thank you for the explanation, it helps.

I would prefer to putting these transposing rules all in "LOGICAL" stage, since 
in this stage we are using cost-based planner. I'm wondering if it's really 
necessary to have some transposing rules (now only 
{{{}CalcRankTransposeRule{}}}) in "LOGICAL_REWRITE" stage, could you check 
whether we still needs {{CalcRankTransposeRule}} in "LOGICAL_REWRITE" after 
introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage?

What's more, I'm even wondering that if we really needs 
{{{}CalcRankTransposeRule{}}}. {{Rank}} is a special form of {{{}Window{}}}, so 
{{ProjectWindowTransposeRule}} should supersede {{{}CalcRankTransposeRule{}}}.

(By saying "transposing rules", usually I would expect these rules are only 
generating more plan alternatives, cost-based planner chooses which is is 
better via cost. That's why you can see many counter pairs of rules like 
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in 
Calcite)


was (Author: libenchao):
[~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps. 

I would prefer to putting these transposing rules all in "LOGICAL" stage, since 
in this stage we are using cost-based planner. I'm wondering if it's really 
necessary to have some transposing rules (now only {{CalcJoinTransposeRule}}) 
in "LOGICAL_REWRITE" stage, could you check whether we still needs  
{{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing 
{{ProjectWindowTransposeRule}} in "LOGICAL" stage?

What's more, I'm even wondering that if we really needs 
{{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so 
{{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}. 

(By saying "transposing rules", usually I would expect these rules are only 
generating more plan alternatives, cost-based planner chooses which is is 
better via cost. That's why you can see many counter pairs of rules like 
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in 
Calcite)

> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
>                 Key: FLINK-34529
>                 URL: https://issues.apache.org/jira/browse/FLINK-34529
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: yisha zhou
>            Assignee: yisha zhou
>            Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
>     |SELECT a FROM (
>     |  SELECT a, f,
>     |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
>     |  FROM  T1, T2
>     |  WHERE T1.a = T2.d
>     |)
>     |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- Calc(select=[a, c])
>             :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- Calc(select=[d, f])
>                   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to