This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 67e1efca0 feat: enable iceberg compat tests, more tests for complex 
types (#1550)
67e1efca0 is described below

commit 67e1efca0556d81320926bbb22eecebbff9497c4
Author: Oleks V <[email protected]>
AuthorDate: Sun Mar 30 13:45:16 2025 -0700

    feat: enable iceberg compat tests, more tests for complex types (#1550)
    
    * feat: enable iceberg compat tests, more tests for complex types
---
 .../apache/comet/CometSparkSessionExtensions.scala | 29 ++++----
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 16 ++--
 .../org/apache/spark/sql/comet/CometScanExec.scala |  3 +-
 .../apache/comet/exec/CometNativeReaderSuite.scala | 87 ++++++++++++++++++----
 .../apache/comet/parquet/ParquetReadSuite.scala    | 30 ++++++--
 5 files changed, 122 insertions(+), 43 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 735363e72..f0d24d673 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -374,13 +374,13 @@ class CometSparkSessionExtensions
 
         // Comet JVM + native scan for V1 and V2
         case op if isCometScan(op) =>
-          val nativeOp = QueryPlanSerde.operator2Proto(op).get
-          CometScanWrapper(nativeOp, op)
+          val nativeOp = QueryPlanSerde.operator2Proto(op)
+          CometScanWrapper(nativeOp.get, op)
 
         case op if shouldApplySparkToColumnar(conf, op) =>
           val cometOp = CometSparkToColumnarExec(op)
-          val nativeOp = QueryPlanSerde.operator2Proto(cometOp).get
-          CometScanWrapper(nativeOp, cometOp)
+          val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
+          CometScanWrapper(nativeOp.get, cometOp)
 
         case op: ProjectExec =>
           val newOp = transform1(op)
@@ -488,7 +488,7 @@ class CometSparkSessionExtensions
           val child = op.child
           val modes = aggExprs.map(_.mode).distinct
 
-          if (!modes.isEmpty && modes.size != 1) {
+          if (modes.nonEmpty && modes.size != 1) {
             // This shouldn't happen as all aggregation expressions should 
share the same mode.
             // Fallback to Spark nevertheless here.
             op
@@ -496,7 +496,7 @@ class CometSparkSessionExtensions
             // For a final mode HashAggregate, we only need to transform the 
HashAggregate
             // if there is Comet partial aggregation.
             val sparkFinalMode = {
-              !modes.isEmpty && modes.head == Final && 
findCometPartialAgg(child).isEmpty
+              modes.nonEmpty && modes.head == Final && 
findCometPartialAgg(child).isEmpty
             }
 
             if (sparkFinalMode) {
@@ -510,7 +510,7 @@ class CometSparkSessionExtensions
                   // distinct aggregate functions or only have group by, the 
aggExprs is empty and
                   // modes is empty too. If aggExprs is not empty, we need to 
verify all the
                   // aggregates have the same mode.
-                  assert(modes.length == 1 || modes.length == 0)
+                  assert(modes.length == 1 || modes.isEmpty)
                   CometHashAggregateExec(
                     nativeOp,
                     op,
@@ -519,7 +519,7 @@ class CometSparkSessionExtensions
                     aggExprs,
                     resultExpressions,
                     child.output,
-                    if (modes.nonEmpty) Some(modes.head) else None,
+                    modes.headOption,
                     child,
                     SerializedPlan(None))
                 case None =>
@@ -530,7 +530,7 @@ class CometSparkSessionExtensions
 
         case op: ShuffledHashJoinExec
             if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) &&
-              op.children.forall(isCometNative(_)) =>
+              op.children.forall(isCometNative) =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
@@ -564,7 +564,7 @@ class CometSparkSessionExtensions
 
         case op: BroadcastHashJoinExec
             if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) &&
-              op.children.forall(isCometNative(_)) =>
+              op.children.forall(isCometNative) =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
@@ -1278,7 +1278,7 @@ object CometSparkSessionExtensions extends Logging {
     op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
   }
 
-  private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): 
Boolean = {
+  def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
     // Only consider converting leaf nodes to columnar currently, so that all 
the following
     // operators can have a chance to be converted to columnar. Leaf operators 
that output
     // columnar batches, such as Spark's vectorized readers, will also be 
converted to native
@@ -1329,10 +1329,9 @@ object CometSparkSessionExtensions extends Logging {
     org.apache.spark.SPARK_VERSION >= "4.0"
   }
 
-  def usingDataFusionParquetExec(conf: SQLConf): Boolean = {
-    CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-    CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == 
CometConf.SCAN_NATIVE_DATAFUSION
-  }
+  def usingDataFusionParquetExec(conf: SQLConf): Boolean =
+    Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, 
CometConf.SCAN_NATIVE_DATAFUSION).contains(
+      CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
 
   /**
    * Whether we should override Spark memory configuration for Comet. This 
only returns true when
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a8a3df0c1..863bf8a3e 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometScan, 
usingDataFusionParquetExec, withInfo}
 import org.apache.comet.expressions._
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType._
@@ -68,9 +68,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde 
with CometExprShim
       true
     case s: StructType if allowComplex =>
       s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
-    // TODO: Add nested array and iceberg compat support
-    // case a: ArrayType if allowComplex =>
-    //  supportedDataType(a.elementType)
+    case a: ArrayType if allowComplex =>
+      supportedDataType(a.elementType, allowComplex)
     case dt =>
       emitWarning(s"unsupported Spark data type: $dt")
       false
@@ -2695,7 +2694,14 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
 
       case op
           if isCometSink(op) && op.output.forall(a =>
-            supportedDataType(a.dataType, allowComplex = true)) =>
+            supportedDataType(
+              a.dataType,
+              // Complex type supported if
+              // - Native datafusion reader enabled (experimental) OR
+              // - conversion from Parquet/JSON enabled
+              allowComplex =
+                usingDataFusionParquetExec(conf) || 
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED
+                  .get(conf) || 
CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf))) =>
         // These operators are source of Comet native execution chain
         val scanBuilder = OperatorOuterClass.Scan.newBuilder()
         val source = op.simpleStringWithNodeId()
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index eeca78678..80ef4f851 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -490,8 +490,7 @@ object CometScanExec extends DataTypeSupport {
       // TODO add map
       dt match {
         case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
-        // TODO: Add nested array and iceberg compat support
-        // case a: ArrayType => isTypeSupported(a.elementType)
+        case a: ArrayType => isTypeSupported(a.elementType)
         case _ => false
       }
     } else {
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 18841f0e7..671f858cf 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -31,19 +31,17 @@ import org.apache.comet.CometConf
 class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
       pos: Position): Unit = {
-    // TODO: Enable Iceberg compat tests
-    Seq(CometConf.SCAN_NATIVE_DATAFUSION /*, 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT*/ ).foreach(
-      scan =>
-        super.test(s"$testName - $scan", testTags: _*) {
-          withSQLConf(
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
-            testFun
-          }
-        })
+    Seq(CometConf.SCAN_NATIVE_DATAFUSION, 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scan =>
+      super.test(s"$testName - $scan", testTags: _*) {
+        withSQLConf(
+          CometConf.COMET_EXEC_ENABLED.key -> "true",
+          SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
+          CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
+          CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
+          testFun
+        }
+      })
   }
 
   test("native reader - read simple STRUCT fields") {
@@ -63,4 +61,67 @@ class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
         |""".stripMargin,
       "select arr from tbl")
   }
+
+  /*
+  native reader - read STRUCT of ARRAY fields - native_datafusion *** FAILED 
*** (191 milliseconds)
+  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 29.0 failed 1 times,
+  most recent failure: Lost task 1.0 in stage 29.0 (TID 35) (192.168.4.142 
executor driver):
+  org.apache.comet.CometNativeException: called `Result::unwrap()` on an `Err` 
value:
+  InvalidArgumentError("Incorrect datatype for StructArray field \"col\", 
expected List(Field { name: \"item\",
+  data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} })
+  got List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} })")
+
+
+  native reader - read STRUCT of ARRAY fields - native_iceberg_compat *** 
FAILED *** (82 milliseconds)
+  org.apache.spark.SparkException: Job aborted due to stage failure:
+  Task 1 in stage 31.0 failed 1 times, most recent failure: Lost task 1.0 in 
stage 31.0 (TID 39) (192.168.4.142 executor driver):
+  org.apache.comet.CometNativeException: called `Result::unwrap()` on an `Err` 
value: InvalidArgumentError("Incorrect datatype for StructArray field \"col\",
+  expected List(Field { name: \"item\", data_type: Int32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} })
+  got List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} })")
+
+   */
+  ignore("native reader - read STRUCT of ARRAY fields") {
+    testSingleLineQuery(
+      """
+        |select named_struct('col', arr) c0 from
+        |(
+        |  select array(1, 2, 3) as arr union all
+        |  select array(2, 3, 4) as arr
+        |)
+        |""".stripMargin,
+      "select c0 from tbl")
+  }
+
+  test("native reader - read ARRAY of ARRAY fields") {
+    testSingleLineQuery(
+      """
+        |select array(arr0, arr1) c0 from
+        |(
+        |  select array(1, 2, 3) as arr0, array(2, 3, 4) as arr1
+        |)
+        |""".stripMargin,
+      "select c0 from tbl")
+  }
+
+  test("native reader - read ARRAY of STRUCT fields") {
+    testSingleLineQuery(
+      """
+        |select array(str0, str1) c0 from
+        |(
+        |  select named_struct('a', 1, 'b', 'n') str0, named_struct('a', 2, 
'b', 'w') str1
+        |)
+        |""".stripMargin,
+      "select c0 from tbl")
+  }
+
+  test("native reader - read STRUCT of STRUCT fields") {
+    testSingleLineQuery(
+      """
+        |select named_struct('a', str0, 'b', str1) c0 from
+        |(
+        |  select named_struct('a', 1, 'b', 'n') str0, named_struct('c', 2, 
'd', 'w') str1
+        |)
+        |""".stripMargin,
+      "select c0 from tbl")
+  }
 }
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index a6526e5fe..f5d0cbb42 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
 import com.google.common.primitives.UnsignedLong
 
 import org.apache.comet.{CometConf, CometSparkSessionExtensions}
+import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
 import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, 
usingDataFusionParquetExec}
 
 abstract class ParquetReadSuite extends CometTestBase {
@@ -86,7 +87,7 @@ abstract class ParquetReadSuite extends CometTestBase {
     // note that native_datafusion does not use CometScanExec so we need not 
include that in
     // the check
     val usingNativeIcebergCompat =
-      (CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+      CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
     Seq(
       NullType -> false,
       BooleanType -> true,
@@ -98,7 +99,9 @@ abstract class ParquetReadSuite extends CometTestBase {
       DoubleType -> true,
       BinaryType -> true,
       StringType -> true,
-      ArrayType(TimestampType) -> false,
+      // Timestamp here arbitrary for picking a concrete data type to from 
ArrayType
+      // Any other type works
+      ArrayType(TimestampType) -> usingNativeIcebergCompat,
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
@@ -119,13 +122,24 @@ abstract class ParquetReadSuite extends CometTestBase {
   }
 
   test("unsupported Spark schema") {
-    Seq(
-      Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) -> 
true,
-      Seq(StructField("f1", IntegerType), StructField("f2", 
ArrayType(IntegerType))) -> false,
-      Seq(
-        StructField("f1", MapType(keyType = LongType, valueType = StringType)),
-        StructField("f2", ArrayType(DoubleType))) -> false).foreach { case 
(schema, expected) =>
+    val schemaDDLs =
+      Seq("f1 int, f2 boolean", "f1 int, f2 array<int>", "f1 map<long, 
string>, f2 array<double>")
+        .map(s => StructType.fromDDL(s))
+
+    // Arrays support for iceberg compat native and for Parquet V1
+    val cometScanExecSupported =
+      if 
(sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) && 
this
+          .isInstanceOf[ParquetReadV1Suite])
+        Seq(true, true, false)
+      else Seq(true, false, false)
+
+    val cometBatchScanExecSupported = Seq(true, false, false)
+
+    schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) =>
       assert(CometScanExec.isSchemaSupported(StructType(schema)) == expected)
+    }
+
+    schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, 
expected) =>
       assert(CometBatchScanExec.isSchemaSupported(StructType(schema)) == 
expected)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to