[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822415#comment-17822415 ]
yisha zhou commented on FLINK-34529: ------------------------------------ hi [~lincoln.86xy] , thanks for your advice. After discussion with [~libenchao] , I agreed that putting these kind of rules to cost-based planner seems to be in line with future trend. Meanwhile I found that most of ProjectXXTransposeRules are in `FlinkStreamRuleSets#PROJECT_RULES`and `PROJECT_RULES` seems to be used both in 'LOGICAL' (volcano)and 'PROJECT_REWRITE'(hep). I prepare to add `CoreRules.PROJECT_WINDOW_TRANSPOSE` to `FlinkStreamRuleSets#PROJECT_RULES`too, so that both kind of planner can utilize the rule. WDT? > Projection cannot be pushed down through rank operator. > ------------------------------------------------------- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.19.0 > Reporter: yisha zhou > Assignee: yisha zhou > Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) > +- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) > +- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) > +- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)