This is an automated email from the ASF dual-hosted git repository.

andygrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cf04c8e3 fix: prevent native sort crash for Struct(Map(...)) keys 
(#4157)
7cf04c8e3 is described below

commit 7cf04c8e39d557ec895a6489b5eefd6076d648ac
Author: ChenChen Lai <[email protected]>
AuthorDate: Fri May 8 07:11:33 2026 +0800

    fix: prevent native sort crash for Struct(Map(...)) keys (#4157)
---
 .../org/apache/spark/sql/comet/operators.scala     | 17 +++++----
 .../apache/comet/exec/CometAggregateSuite.scala    | 44 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 32b0ac7d7..f315aae6e 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -43,7 +43,7 @@ import 
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, 
ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, 
ShortType, StringType, TimestampNTZType, TimestampType}
+import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, 
ShortType, StringType, StructType, TimestampNTZType, TimestampType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.io.ChunkedByteBuffer
@@ -1404,6 +1404,13 @@ case class CometUnionExec(
 
 trait CometBaseAggregate {
 
+  private def containsMapType(dt: DataType): Boolean = dt match {
+    case _: MapType => true
+    case StructType(fields) => fields.exists(f => containsMapType(f.dataType))
+    case ArrayType(elementType, _) => containsMapType(elementType)
+    case _ => false
+  }
+
   def doConvert(
       aggregate: BaseAggregateExec,
       builder: Operator.Builder,
@@ -1434,12 +1441,8 @@ trait CometBaseAggregate {
       return None
     }
 
-    if (groupingExpressions.exists(expr =>
-        expr.dataType match {
-          case _: MapType => true
-          case _ => false
-        })) {
-      withInfo(aggregate, "Grouping on map types is not supported")
+    if (groupingExpressions.exists(expr => containsMapType(expr.dataType))) {
+      withInfo(aggregate, "Grouping on map-containing types is not supported")
       return None
     }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index f3639c8fe..756efcd10 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types.{DataTypes, StructField, 
StructType}
 
 import org.apache.comet.CometConf
 import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
+import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus
 import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, 
ParquetGenerator, SchemaGenOptions}
 
 /**
@@ -408,6 +409,33 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("grouping on struct containing map should fallback to Spark") {
+    assume(isSpark41Plus, "Spark 4.1+ supports grouping on map-containing 
types")
+    withSQLConf(
+      CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      val query =
+        """SELECT col1.data['key']
+          |FROM VALUES
+          |  (NAMED_STRUCT('data', MAP('key', 'value', 'num', '42'))),
+          |  (NAMED_STRUCT('data', MAP('key', 'other', 'num', '7')))
+          |t (col1)
+          |GROUP BY col1
+          |HAVING col1.data['num'] IS NOT NULL
+          |ORDER BY col1.data['key']
+          |""".stripMargin
+
+      val (_, cometPlan) =
+        checkSparkAnswerAndFallbackReason(
+          query,
+          "Grouping on map-containing types is not supported")
+
+      assert(
+        stripAQEPlan(cometPlan).collect { case s: CometHashAggregateExec => s 
}.isEmpty,
+        "Expected aggregate to fall back to Spark for grouping on 
Struct(Map(...))")
+    }
+  }
+
   test("simple SUM, COUNT, MIN, MAX, AVG with non-distinct + null group keys") 
{
     Seq(true, false).foreach { dictionaryEnabled =>
       withParquetTable(
@@ -2059,4 +2087,20 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     sparkPlan.collect { case s: CometHashAggregateExec => s }.size
   }
 
+  test("group by array of map falls back to Spark (issue #4123)") {
+    assume(isSpark41Plus, "Spark 4.1+ supports grouping on map-containing 
types")
+    withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+      checkSparkAnswerAndFallbackReason(
+        """SELECT a, COUNT(*)
+          |FROM VALUES
+          |  (ARRAY(MAP('x', 10))),
+          |  (ARRAY(MAP('y', 20))),
+          |  (ARRAY(MAP('x', 10)))
+          |t (a)
+          |GROUP BY a
+          |""".stripMargin,
+        "Grouping on map-containing types is not supported")
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to