[
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822118#comment-17822118
]
Benchao Li edited comment on FLINK-34529 at 2/29/24 1:08 PM:
-------------------------------------------------------------
[~xuyangzhong] [~nilerzhou] Thank you for the explanation, it helps.
I would prefer to putting these transposing rules all in "LOGICAL" stage, since
in this stage we are using cost-based planner. I'm wondering if it's really
necessary to have some transposing rules (now only
{{{}CalcRankTransposeRule{}}}) in "LOGICAL_REWRITE" stage, could you check
whether we still needs {{CalcRankTransposeRule}} in "LOGICAL_REWRITE" after
introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage?
What's more, I'm even wondering that if we really needs
{{{}CalcRankTransposeRule{}}}. {{Rank}} is a special form of {{{}Window{}}}, so
{{ProjectWindowTransposeRule}} should supersede {{{}CalcRankTransposeRule{}}}.
(By saying "transposing rules", usually I would expect these rules are only
generating more plan alternatives, cost-based planner chooses which is is
better via cost. That's why you can see many counter pairs of rules like
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in
Calcite)
was (Author: libenchao):
[~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps.
I would prefer to putting these transposing rules all in "LOGICAL" stage, since
in this stage we are using cost-based planner. I'm wondering if it's really
necessary to have some transposing rules (now only {{CalcJoinTransposeRule}})
in "LOGICAL_REWRITE" stage, could you check whether we still needs
{{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing
{{ProjectWindowTransposeRule}} in "LOGICAL" stage?
What's more, I'm even wondering that if we really needs
{{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so
{{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}.
(By saying "transposing rules", usually I would expect these rules are only
generating more plan alternatives, cost-based planner chooses which is is
better via cost. That's why you can see many counter pairs of rules like
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in
Calcite)
> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.19.0
> Reporter: yisha zhou
> Assignee: yisha zhou
> Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil()
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
> """
> |SELECT a FROM (
> | SELECT a, f,
> | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> | FROM T1, T2
> | WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
> """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- Calc(select=[d, f])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d,
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)