This is an automated email from the ASF dual-hosted git repository.

lgbo-ustc pushed a commit to branch bug_group_limit_empty_offsets
in repository https://gitbox.apache.org/repos/asf/gluten.git

commit 2e134bb09d31f88e956a0e1d64d716358bcdd9a1
Author: lgbo-ustc <[email protected]>
AuthorDate: Thu May 28 20:01:51 2026 +0800

    [CH] Fix group limit first array result offset
    
    RowNumGroupArraySorted writes aggregate results into a newly created 
ColumnArray. For the first output row the array offsets vector can be empty, 
but insertResultInto read result_array_offsets.back() before appending the 
first offset. That is undefined behavior and can crash when aggregate top-k 
writes its first result.
    
    Treat an empty offsets vector as having previous offset 0 before appending 
the next cumulative offset. Add a ClickHouse backend regression test that 
forces row_number top-k through the aggregate group limit path and validates 
the first array result row against vanilla Spark.
---
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 29 ++++++++++++++++++++++
 .../AggregateFunctions/GroupLimitFunctions.cpp     |  3 ++-
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 0539f721e6..9a305cc883 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -3045,6 +3045,35 @@ class GlutenClickHouseTPCHSaltNullParquetSuite
 
   }
 
+  test("row number aggregate topk handles first array result offset") {
+    withSQLConf(
+      (CHConfig.runtimeSettings("enable_window_group_limit_to_aggregate"), 
"true"),
+      
(CHConfig.runtimeSettings("window.aggregate_topk_high_cardinality_threshold"), 
"2.0")
+    ) {
+      spark.sql("drop table if exists test_win_top_first_offset")
+      spark.sql("create table test_win_top_first_offset (a string, b int) 
using parquet")
+      spark.sql("insert into test_win_top_first_offset values ('a', 2), ('a', 
1)")
+
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from (
+          |  select a, b, row_number() over (partition by a order by b) as r
+          |  from test_win_top_first_offset
+          |) where r <= 1
+          |""".stripMargin,
+        compareResult = true,
+        df => {
+          val aggregateGroupLimit = 
collectWithSubqueries(df.queryExecution.executedPlan) {
+            case e: CHAggregateGroupLimitExecTransformer => e
+          }
+          assert(aggregateGroupLimit.nonEmpty)
+        }
+      )
+
+      spark.sql("drop table if exists test_win_top_first_offset")
+    }
+  }
+
   test("GLUTEN-7905 get topk of window by window") {
     withSQLConf(
       (CHConfig.runtimeSettings("enable_window_group_limit_to_aggregate"), 
"true"),
diff --git a/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp 
b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp
index 20ef632ee6..032bf6f686 100644
--- a/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp
+++ b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp
@@ -142,7 +142,8 @@ public:
 
         sortAndLimit(max_elements, sort_orders);
 
-        result_array_offsets.push_back(result_array_offsets.back() + 
values.size());
+        const auto previous_offset = result_array_offsets.empty() ? 0 : 
result_array_offsets.back();
+        result_array_offsets.push_back(previous_offset + values.size());
 
         if (values.empty())
             return;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to