Andy created FLINK-22038: ---------------------------- Summary: Update TopN node to be without rowNumber if rowNumber field is projected out after TopN Key: FLINK-22038 URL: https://issues.apache.org/jira/browse/FLINK-22038 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Andy Attachments: image-2021-03-30-16-03-09-876.png
As describe in article [sql_queries|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql/queries.html#no-ranking-output-optimization], an optimization way to improve performance of TopN is omitting rownum field in the outer SELECT clause of the Top-N query. However, some queries generated unexpected plan, even though we have followed the instructions in the documentation. {code:java} @Test def testRowNumberFiltered(): Unit = { util.addDataStream[(String, Long, Long, Long)]( "T", 'uri, 'reqcount, 'start_time, 'bucket_id) val sql = """ |SELECT | uri, | reqcount, | start_time |FROM | ( | SELECT | uri, | reqcount, | rownum_2, | start_time | FROM | ( | SELECT | uri, | reqcount, | start_time, | ROW_NUMBER() OVER ( | PARTITION BY start_time | ORDER BY | reqcount DESC | ) AS rownum_2 | FROM | ( | SELECT | uri, | reqcount, | start_time, | ROW_NUMBER() OVER ( | PARTITION BY start_time, bucket_id | ORDER BY | reqcount DESC | ) AS rownum_1 | FROM T | ) | WHERE | rownum_1 <= 100000 | ) | WHERE | rownum_2 <= 100000 |) |""".stripMargin util.verifyExecPlan(sql) } {code} For example, we expect both outer and inner TopN could use without rowNumber optimization in the above queries, however inner TopN is not as we expected. The logical plan and physical plan as following, we could find even though the rowNumber field is projected out after inner topN, inner topN still with rowNumber. !image-2021-03-30-16-03-09-876.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)