This is an automated email from the ASF dual-hosted git repository.
andygrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7cf04c8e3 fix: prevent native sort crash for Struct(Map(...)) keys
(#4157)
7cf04c8e3 is described below
commit 7cf04c8e39d557ec895a6489b5eefd6076d648ac
Author: ChenChen Lai <[email protected]>
AuthorDate: Fri May 8 07:11:33 2026 +0800
fix: prevent native sort crash for Struct(Map(...)) keys (#4157)
---
.../org/apache/spark/sql/comet/operators.scala | 17 +++++----
.../apache/comet/exec/CometAggregateSuite.scala | 44 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 7 deletions(-)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 32b0ac7d7..f315aae6e 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -43,7 +43,7 @@ import
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin,
ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType,
ShortType, StringType, TimestampNTZType, TimestampType}
+import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType,
ShortType, StringType, StructType, TimestampNTZType, TimestampType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -1404,6 +1404,13 @@ case class CometUnionExec(
trait CometBaseAggregate {
+ private def containsMapType(dt: DataType): Boolean = dt match {
+ case _: MapType => true
+ case StructType(fields) => fields.exists(f => containsMapType(f.dataType))
+ case ArrayType(elementType, _) => containsMapType(elementType)
+ case _ => false
+ }
+
def doConvert(
aggregate: BaseAggregateExec,
builder: Operator.Builder,
@@ -1434,12 +1441,8 @@ trait CometBaseAggregate {
return None
}
- if (groupingExpressions.exists(expr =>
- expr.dataType match {
- case _: MapType => true
- case _ => false
- })) {
- withInfo(aggregate, "Grouping on map types is not supported")
+ if (groupingExpressions.exists(expr => containsMapType(expr.dataType))) {
+ withInfo(aggregate, "Grouping on map-containing types is not supported")
return None
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index f3639c8fe..756efcd10 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types.{DataTypes, StructField,
StructType}
import org.apache.comet.CometConf
import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
+import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator,
ParquetGenerator, SchemaGenOptions}
/**
@@ -408,6 +409,33 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
+ test("grouping on struct containing map should fallback to Spark") {
+ assume(isSpark41Plus, "Spark 4.1+ supports grouping on map-containing
types")
+ withSQLConf(
+ CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+ val query =
+ """SELECT col1.data['key']
+ |FROM VALUES
+ | (NAMED_STRUCT('data', MAP('key', 'value', 'num', '42'))),
+ | (NAMED_STRUCT('data', MAP('key', 'other', 'num', '7')))
+ |t (col1)
+ |GROUP BY col1
+ |HAVING col1.data['num'] IS NOT NULL
+ |ORDER BY col1.data['key']
+ |""".stripMargin
+
+ val (_, cometPlan) =
+ checkSparkAnswerAndFallbackReason(
+ query,
+ "Grouping on map-containing types is not supported")
+
+ assert(
+ stripAQEPlan(cometPlan).collect { case s: CometHashAggregateExec => s
}.isEmpty,
+ "Expected aggregate to fall back to Spark for grouping on
Struct(Map(...))")
+ }
+ }
+
test("simple SUM, COUNT, MIN, MAX, AVG with non-distinct + null group keys")
{
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable(
@@ -2059,4 +2087,20 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
sparkPlan.collect { case s: CometHashAggregateExec => s }.size
}
+ test("group by array of map falls back to Spark (issue #4123)") {
+ assume(isSpark41Plus, "Spark 4.1+ supports grouping on map-containing
types")
+ withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
+ checkSparkAnswerAndFallbackReason(
+ """SELECT a, COUNT(*)
+ |FROM VALUES
+ | (ARRAY(MAP('x', 10))),
+ | (ARRAY(MAP('y', 20))),
+ | (ARRAY(MAP('x', 10)))
+ |t (a)
+ |GROUP BY a
+ |""".stripMargin,
+ "Grouping on map-containing types is not supported")
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]