yisha zhou created FLINK-34529:
----------------------------------
Summary: Projection cannot be pushed down through rank operator.
Key: FLINK-34529
URL: https://issues.apache.org/jira/browse/FLINK-34529
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: yisha zhou
When there is a rank/deduplicate operator, the projection based on output of
this operator cannot be pushed down to the input of it.
The following code can help reproducing the issue:
{code:java}
val util = streamTestUtil()
util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
val sql =
"""
|SELECT a FROM (
| SELECT a, f,
| ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
| FROM T1, T2
| WHERE T1.a = T2.d
|)
|WHERE rank_num = 1
""".stripMargin
util.verifyPlan(sql){code}
The plan is expected to be:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
select=[a, c, f])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[a, c, f])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, c])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d, f])
+- LegacyTableSourceScan(table=[[default_catalog,
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
{code}
Notice that the 'select' of Join operator is [a, c, d, f]. However the actual
plan is:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC],
select=[a, c, f])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[a, c, f])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e,
f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- LegacyTableSourceScan(table=[[default_catalog,
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
{code}
the 'select' of Join operator is [a, b, c, d, e, f], which means the projection
in the final Calc is not passed through the Rank.
And I think an easy way to fix this issue is to add
org.apache.calcite.rel.rules.ProjectWindowTransposeRule into
FlinkStreamRuleSets.LOGICAL_OPT_RULES.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)