This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b37c98599b3 [FLINK-37610] Check if `ORDER BY` keys exists before 
accessing
b37c98599b3 is described below

commit b37c98599b34aa62ee0da4edda6ba422f73766ed
Author: Bonnie Varghese <bvargh...@confluent.io>
AuthorDate: Thu Apr 10 00:53:32 2025 -0700

    [FLINK-37610] Check if `ORDER BY` keys exists before accessing
    
    - If `ORDER BY` clause is not specified in an Over agg, it would lead to
      IndexOutOfBoundsException
    - This commit adds a check to ensure `ORDER BY` fields exists before 
accessing them
---
 .../FlinkChangelogModeInferenceProgram.scala       | 26 ++++++++++++----------
 .../plan/stream/sql/agg/OverAggregateTest.xml      | 23 +++++++++++++++++++
 .../plan/stream/sql/agg/OverAggregateTest.scala    | 13 +++++++++++
 3 files changed, 50 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index a569d5b64c2..8b128fccb71 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -309,19 +309,21 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val builder = ModifyKindSet
           .newBuilder()
           .addContainedKind(ModifyKind.INSERT)
+        val groups = over.logicWindow.groups
 
-        // All aggregates are computed over the same window and order by is 
supported for only 1 field
-        val orderKeyIndex =
-          
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
-        val orderKeyType = 
over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
-        if (
-          !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
-          && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
-        ) {
-          // Only non row-time/proc-time sort can support UPDATES
-          builder.addContainedKind(ModifyKind.UPDATE)
-          builder.addContainedKind(ModifyKind.DELETE)
-          overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
+        if (!groups.isEmpty && 
!groups.get(0).orderKeys.getFieldCollations.isEmpty) {
+          // All aggregates are computed over the same window and order by is 
supported for only 1 field
+          val orderKeyIndex = 
groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
+          val orderKeyType = 
over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
+          if (
+            !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
+            && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
+          ) {
+            // Only non row-time/proc-time sort can support UPDATES
+            builder.addContainedKind(ModifyKind.UPDATE)
+            builder.addContainedKind(ModifyKind.DELETE)
+            overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
+          }
         }
         val children = visitChildren(over, overRequiredTrait)
         val providedTrait = new ModifyKindSetTrait(builder.build())
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 72e3bffb228..d28e2818592 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -526,4 +526,27 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, 
null:INTEGER) AS cnt2])
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testWithoutOrderByClause">
+               <Resource name="sql">
+                       <![CDATA[
+SELECT c,
+    COUNT(a) OVER (PARTITION BY c) AS cnt1
+FROM MyTable
+      ]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Calc(select=[c, w0$o0 AS $1])
++- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, 
COUNT(a) AS w0$o0])
+   +- Exchange(distribution=[hash[c]])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+               </Resource>
+       </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
index beb16551af9..86c9284481f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
@@ -467,4 +467,17 @@ class OverAggregateTest extends TableTestBase {
                            |)
                            |""".stripMargin)
   }
+
+  @Test
+  def testWithoutOrderByClause(): Unit = {
+    val sql =
+      """
+        |SELECT c,
+        |    COUNT(a) OVER (PARTITION BY c) AS cnt1
+        |FROM MyTable
+      """.stripMargin
+
+    util.verifyExecPlan(sql)
+  }
+
 }

Reply via email to