[
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824368#comment-17824368
]
yisha zhou commented on FLINK-34529:
------------------------------------
Hi, [~libenchao] , [~lincoln.86xy] , I got some updates about the solution. I
prepared to :
# add `ProjectWindowTransposeRule` into PROJECT_RULES(used in
`PROJECT_REWRITE` and `LOGICAL`),
# remove `CalcRankTransposeRule` from `LOGICAL_REWRITE`
For {*}Action 1{*}, there is optimization regression in one test, I'm not sure
if we can ignore it and optimize the case in other way. The test is
`RankTest#testRankWithAnotherRankAsInput`.
The original plan is
{code:java}
Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2])
+- Rank(strategy=[UpdateFastStrategy[0,2,3]], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC],
select=[a, b, c, w0$o0, w0$o0_0])
+- Exchange(distribution=[hash[a]])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC],
select=[a, b, c, w0$o0])
+- Exchange(distribution=[hash[a, c]])
+- Calc(select=[a, b, c])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime]) {code}
The changed plan is
{code:java}
Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2])
+- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC],
select=[a, b, w0$o0, w0$o0_0])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, w0$o0])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC],
select=[a, b, c, w0$o0])
+- Exchange(distribution=[hash[a, c]])
+- Calc(select=[a, b, c])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
{code}
You can find that the strategy of second Rank changed from `UpdateFastStrategy`
to `RetractStrategy`. The reason is that the component of primary key of input,
field `c ` is pruned, so we cannot meet the requirements of
`UpdateFastStrategy` anymore.
IMO, it's not a general query optimization but query execution optimization
like different implementations in Join depending on upsert keys of input.
Therefore, I think we can ignore this change here, WDT?
For {*}Action 2{*}, I found that `CorrelateSortToRankRule` can translate
LogicalCorrelate into LogicalRank, no LogicalWindow used in the process.
Therefore `ProjectWindowTransposeRule` can not cover all cases of Rank and
Project transpose. We should keep it. And I tried to move this rule into
`LOGICAL` stage, it didn't work well.
The root cause is that cost of Exchange and Rank are not related to length of
output row, therefore adding a Calc before Rank maybe cost more and is not
chosen by the planner. I'll keep the `CalcRankTransposeRule` in
`LOGICAL_REWRITE` util the costs of Rank and Calc are optimized in future PR.
Look forward to your opinions about these two updates.
> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.19.0
> Reporter: yisha zhou
> Assignee: yisha zhou
> Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil()
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
> """
> |SELECT a FROM (
> | SELECT a, f,
> | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> | FROM T1, T2
> | WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
> """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- Calc(select=[d, f])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
> select=[a, c, f])
> +- Exchange(distribution=[hash[f]])
> +- Calc(select=[a, c, f])
> +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d,
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> : +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
> +- LegacyTableSourceScan(table=[[default_catalog,
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
> {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)