This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b37c98599b3 [FLINK-37610] Check if `ORDER BY` keys exists before accessing b37c98599b3 is described below commit b37c98599b34aa62ee0da4edda6ba422f73766ed Author: Bonnie Varghese <bvargh...@confluent.io> AuthorDate: Thu Apr 10 00:53:32 2025 -0700 [FLINK-37610] Check if `ORDER BY` keys exists before accessing - If `ORDER BY` clause is not specified in an Over agg, it would lead to IndexOutOfBoundsException - This commit adds a check to ensure `ORDER BY` fields exists before accessing them --- .../FlinkChangelogModeInferenceProgram.scala | 26 ++++++++++++---------- .../plan/stream/sql/agg/OverAggregateTest.xml | 23 +++++++++++++++++++ .../plan/stream/sql/agg/OverAggregateTest.scala | 13 +++++++++++ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index a569d5b64c2..8b128fccb71 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -309,19 +309,21 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val builder = ModifyKindSet .newBuilder() .addContainedKind(ModifyKind.INSERT) + val groups = over.logicWindow.groups - // All aggregates are computed over the same window and order by is supported for only 1 field - val orderKeyIndex = - over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex - val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType - if ( - !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType) - && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType) - ) { - // Only non row-time/proc-time sort can support UPDATES - builder.addContainedKind(ModifyKind.UPDATE) - builder.addContainedKind(ModifyKind.DELETE) - overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES + if (!groups.isEmpty && !groups.get(0).orderKeys.getFieldCollations.isEmpty) { + // All aggregates are computed over the same window and order by is supported for only 1 field + val orderKeyIndex = groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex + val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType + if ( + !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType) + && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType) + ) { + // Only non row-time/proc-time sort can support UPDATES + builder.addContainedKind(ModifyKind.UPDATE) + builder.addContainedKind(ModifyKind.DELETE) + overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES + } } val children = visitChildren(over, overRequiredTrait) val providedTrait = new ModifyKindSetTrait(builder.build()) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml index 72e3bffb228..d28e2818592 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml @@ -526,4 +526,27 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) ]]> </Resource> </TestCase> + <TestCase name="testWithoutOrderByClause"> + <Resource name="sql"> + <![CDATA[ +SELECT c, + COUNT(a) OVER (PARTITION BY c) AS cnt1 +FROM MyTable + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2)]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[c, w0$o0 AS $1]) ++- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, COUNT(a) AS w0$o0]) + +- Exchange(distribution=[hash[c]]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala index beb16551af9..86c9284481f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala @@ -467,4 +467,17 @@ class OverAggregateTest extends TableTestBase { |) |""".stripMargin) } + + @Test + def testWithoutOrderByClause(): Unit = { + val sql = + """ + |SELECT c, + | COUNT(a) OVER (PARTITION BY c) AS cnt1 + |FROM MyTable + """.stripMargin + + util.verifyExecPlan(sql) + } + }