[
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823920#comment-17823920
]
Benchao Li commented on FLINK-34529:
------------------------------------
+1 for adding it to "PROJECT_RULES" so that we can utilize it in both cbo and
rbo stages.
bq. For the question 'I'm even wondering that if we really needs
CalcRankTransposeRule', I've tried to remove it, and found that
`ProjectWindowTransposeRule` can completely cover the functionality of
`CalcRankTransposeRule`(from the results of tests introduced along with this
rule) and even do much better job in some cases. Therefore, I prepare to
remove this rule in the this PR, WDT?
[~nilerzhou] I'm ok with it
> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.19.0
> Reporter: yisha zhou
> Assignee: yisha zhou
> Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil()
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
> """
> |SELECT a FROM (
> | SELECT a, f,
> | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> | FROM T1, T2
> | WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
> """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- Calc(select=[d, f])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d,
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)