Andy created FLINK-22038:
----------------------------
Summary: Update TopN node to be without rowNumber if rowNumber
field is projected out after TopN
Key: FLINK-22038
URL: https://issues.apache.org/jira/browse/FLINK-22038
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Andy
Attachments: image-2021-03-30-16-03-09-876.png
As describe in article
[sql_queries|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql/queries.html#no-ranking-output-optimization],
an optimization way to improve performance of TopN is omitting rownum field in
the outer SELECT clause of the Top-N query.
However, some queries generated unexpected plan, even though we have followed
the instructions in the documentation.
{code:java}
@Test
def testRowNumberFiltered(): Unit = {
util.addDataStream[(String, Long, Long, Long)](
"T", 'uri, 'reqcount, 'start_time, 'bucket_id)
val sql =
"""
|SELECT
| uri,
| reqcount,
| start_time
|FROM
| (
| SELECT
| uri,
| reqcount,
| rownum_2,
| start_time
| FROM
| (
| SELECT
| uri,
| reqcount,
| start_time,
| ROW_NUMBER() OVER (
| PARTITION BY start_time
| ORDER BY
| reqcount DESC
| ) AS rownum_2
| FROM
| (
| SELECT
| uri,
| reqcount,
| start_time,
| ROW_NUMBER() OVER (
| PARTITION BY start_time, bucket_id
| ORDER BY
| reqcount DESC
| ) AS rownum_1
| FROM T
| )
| WHERE
| rownum_1 <= 100000
| )
| WHERE
| rownum_2 <= 100000
|)
|""".stripMargin
util.verifyExecPlan(sql)
}
{code}
For example, we expect both outer and inner TopN could use without rowNumber
optimization in the above queries, however inner TopN is not as we expected.
The logical plan and physical plan as following, we could find even though the
rowNumber field is projected out after inner topN, inner topN still with
rowNumber.
!image-2021-03-30-16-03-09-876.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)