This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 67e1efca0 feat: enable iceberg compat tests, more tests for complex
types (#1550)
67e1efca0 is described below
commit 67e1efca0556d81320926bbb22eecebbff9497c4
Author: Oleks V <[email protected]>
AuthorDate: Sun Mar 30 13:45:16 2025 -0700
feat: enable iceberg compat tests, more tests for complex types (#1550)
* feat: enable iceberg compat tests, more tests for complex types
---
.../apache/comet/CometSparkSessionExtensions.scala | 29 ++++----
.../org/apache/comet/serde/QueryPlanSerde.scala | 16 ++--
.../org/apache/spark/sql/comet/CometScanExec.scala | 3 +-
.../apache/comet/exec/CometNativeReaderSuite.scala | 87 ++++++++++++++++++----
.../apache/comet/parquet/ParquetReadSuite.scala | 30 ++++++--
5 files changed, 122 insertions(+), 43 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 735363e72..f0d24d673 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -374,13 +374,13 @@ class CometSparkSessionExtensions
// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
- val nativeOp = QueryPlanSerde.operator2Proto(op).get
- CometScanWrapper(nativeOp, op)
+ val nativeOp = QueryPlanSerde.operator2Proto(op)
+ CometScanWrapper(nativeOp.get, op)
case op if shouldApplySparkToColumnar(conf, op) =>
val cometOp = CometSparkToColumnarExec(op)
- val nativeOp = QueryPlanSerde.operator2Proto(cometOp).get
- CometScanWrapper(nativeOp, cometOp)
+ val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
+ CometScanWrapper(nativeOp.get, cometOp)
case op: ProjectExec =>
val newOp = transform1(op)
@@ -488,7 +488,7 @@ class CometSparkSessionExtensions
val child = op.child
val modes = aggExprs.map(_.mode).distinct
- if (!modes.isEmpty && modes.size != 1) {
+ if (modes.nonEmpty && modes.size != 1) {
// This shouldn't happen as all aggregation expressions should
share the same mode.
// Fallback to Spark nevertheless here.
op
@@ -496,7 +496,7 @@ class CometSparkSessionExtensions
// For a final mode HashAggregate, we only need to transform the
HashAggregate
// if there is Comet partial aggregation.
val sparkFinalMode = {
- !modes.isEmpty && modes.head == Final &&
findCometPartialAgg(child).isEmpty
+ modes.nonEmpty && modes.head == Final &&
findCometPartialAgg(child).isEmpty
}
if (sparkFinalMode) {
@@ -510,7 +510,7 @@ class CometSparkSessionExtensions
// distinct aggregate functions or only have group by, the
aggExprs is empty and
// modes is empty too. If aggExprs is not empty, we need to
verify all the
// aggregates have the same mode.
- assert(modes.length == 1 || modes.length == 0)
+ assert(modes.length == 1 || modes.isEmpty)
CometHashAggregateExec(
nativeOp,
op,
@@ -519,7 +519,7 @@ class CometSparkSessionExtensions
aggExprs,
resultExpressions,
child.output,
- if (modes.nonEmpty) Some(modes.head) else None,
+ modes.headOption,
child,
SerializedPlan(None))
case None =>
@@ -530,7 +530,7 @@ class CometSparkSessionExtensions
case op: ShuffledHashJoinExec
if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) &&
- op.children.forall(isCometNative(_)) =>
+ op.children.forall(isCometNative) =>
val newOp = transform1(op)
newOp match {
case Some(nativeOp) =>
@@ -564,7 +564,7 @@ class CometSparkSessionExtensions
case op: BroadcastHashJoinExec
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) &&
- op.children.forall(isCometNative(_)) =>
+ op.children.forall(isCometNative) =>
val newOp = transform1(op)
newOp match {
case Some(nativeOp) =>
@@ -1278,7 +1278,7 @@ object CometSparkSessionExtensions extends Logging {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
}
- private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan):
Boolean = {
+ def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
// Only consider converting leaf nodes to columnar currently, so that all
the following
// operators can have a chance to be converted to columnar. Leaf operators
that output
// columnar batches, such as Spark's vectorized readers, will also be
converted to native
@@ -1329,10 +1329,9 @@ object CometSparkSessionExtensions extends Logging {
org.apache.spark.SPARK_VERSION >= "4.0"
}
- def usingDataFusionParquetExec(conf: SQLConf): Boolean = {
- CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
- CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ==
CometConf.SCAN_NATIVE_DATAFUSION
- }
+ def usingDataFusionParquetExec(conf: SQLConf): Boolean =
+ Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.SCAN_NATIVE_DATAFUSION).contains(
+ CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
/**
* Whether we should override Spark memory configuration for Comet. This
only returns true when
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a8a3df0c1..863bf8a3e 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometScan,
usingDataFusionParquetExec, withInfo}
import org.apache.comet.expressions._
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType =>
ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType._
@@ -68,9 +68,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde
with CometExprShim
true
case s: StructType if allowComplex =>
s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
- // TODO: Add nested array and iceberg compat support
- // case a: ArrayType if allowComplex =>
- // supportedDataType(a.elementType)
+ case a: ArrayType if allowComplex =>
+ supportedDataType(a.elementType, allowComplex)
case dt =>
emitWarning(s"unsupported Spark data type: $dt")
false
@@ -2695,7 +2694,14 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
case op
if isCometSink(op) && op.output.forall(a =>
- supportedDataType(a.dataType, allowComplex = true)) =>
+ supportedDataType(
+ a.dataType,
+ // Complex type supported if
+ // - Native datafusion reader enabled (experimental) OR
+ // - conversion from Parquet/JSON enabled
+ allowComplex =
+ usingDataFusionParquetExec(conf) ||
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED
+ .get(conf) ||
CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf))) =>
// These operators are source of Comet native execution chain
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
val source = op.simpleStringWithNodeId()
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index eeca78678..80ef4f851 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -490,8 +490,7 @@ object CometScanExec extends DataTypeSupport {
// TODO add map
dt match {
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
- // TODO: Add nested array and iceberg compat support
- // case a: ArrayType => isTypeSupported(a.elementType)
+ case a: ArrayType => isTypeSupported(a.elementType)
case _ => false
}
} else {
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 18841f0e7..671f858cf 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -31,19 +31,17 @@ import org.apache.comet.CometConf
class CometNativeReaderSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
- // TODO: Enable Iceberg compat tests
- Seq(CometConf.SCAN_NATIVE_DATAFUSION /*,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT*/ ).foreach(
- scan =>
- super.test(s"$testName - $scan", testTags: _*) {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
- testFun
- }
- })
+ Seq(CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scan =>
+ super.test(s"$testName - $scan", testTags: _*) {
+ withSQLConf(
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
+ testFun
+ }
+ })
}
test("native reader - read simple STRUCT fields") {
@@ -63,4 +61,67 @@ class CometNativeReaderSuite extends CometTestBase with
AdaptiveSparkPlanHelper
|""".stripMargin,
"select arr from tbl")
}
+
+ /*
+ native reader - read STRUCT of ARRAY fields - native_datafusion *** FAILED
*** (191 milliseconds)
+ org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 29.0 failed 1 times,
+ most recent failure: Lost task 1.0 in stage 29.0 (TID 35) (192.168.4.142
executor driver):
+ org.apache.comet.CometNativeException: called `Result::unwrap()` on an `Err`
value:
+ InvalidArgumentError("Incorrect datatype for StructArray field \"col\",
expected List(Field { name: \"item\",
+ data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} })
+ got List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} })")
+
+
+ native reader - read STRUCT of ARRAY fields - native_iceberg_compat ***
FAILED *** (82 milliseconds)
+ org.apache.spark.SparkException: Job aborted due to stage failure:
+ Task 1 in stage 31.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 31.0 (TID 39) (192.168.4.142 executor driver):
+ org.apache.comet.CometNativeException: called `Result::unwrap()` on an `Err`
value: InvalidArgumentError("Incorrect datatype for StructArray field \"col\",
+ expected List(Field { name: \"item\", data_type: Int32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} })
+ got List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} })")
+
+ */
+ ignore("native reader - read STRUCT of ARRAY fields") {
+ testSingleLineQuery(
+ """
+ |select named_struct('col', arr) c0 from
+ |(
+ | select array(1, 2, 3) as arr union all
+ | select array(2, 3, 4) as arr
+ |)
+ |""".stripMargin,
+ "select c0 from tbl")
+ }
+
+ test("native reader - read ARRAY of ARRAY fields") {
+ testSingleLineQuery(
+ """
+ |select array(arr0, arr1) c0 from
+ |(
+ | select array(1, 2, 3) as arr0, array(2, 3, 4) as arr1
+ |)
+ |""".stripMargin,
+ "select c0 from tbl")
+ }
+
+ test("native reader - read ARRAY of STRUCT fields") {
+ testSingleLineQuery(
+ """
+ |select array(str0, str1) c0 from
+ |(
+ | select named_struct('a', 1, 'b', 'n') str0, named_struct('a', 2,
'b', 'w') str1
+ |)
+ |""".stripMargin,
+ "select c0 from tbl")
+ }
+
+ test("native reader - read STRUCT of STRUCT fields") {
+ testSingleLineQuery(
+ """
+ |select named_struct('a', str0, 'b', str1) c0 from
+ |(
+ | select named_struct('a', 1, 'b', 'n') str0, named_struct('c', 2,
'd', 'w') str1
+ |)
+ |""".stripMargin,
+ "select c0 from tbl")
+ }
}
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index a6526e5fe..f5d0cbb42 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
import com.google.common.primitives.UnsignedLong
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
+import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus,
usingDataFusionParquetExec}
abstract class ParquetReadSuite extends CometTestBase {
@@ -86,7 +87,7 @@ abstract class ParquetReadSuite extends CometTestBase {
// note that native_datafusion does not use CometScanExec so we need not
include that in
// the check
val usingNativeIcebergCompat =
- (CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+ CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
Seq(
NullType -> false,
BooleanType -> true,
@@ -98,7 +99,9 @@ abstract class ParquetReadSuite extends CometTestBase {
DoubleType -> true,
BinaryType -> true,
StringType -> true,
- ArrayType(TimestampType) -> false,
+ // Timestamp here arbitrary for picking a concrete data type to from
ArrayType
+ // Any other type works
+ ArrayType(TimestampType) -> usingNativeIcebergCompat,
StructType(
Seq(
StructField("f1", DecimalType.SYSTEM_DEFAULT),
@@ -119,13 +122,24 @@ abstract class ParquetReadSuite extends CometTestBase {
}
test("unsupported Spark schema") {
- Seq(
- Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) ->
true,
- Seq(StructField("f1", IntegerType), StructField("f2",
ArrayType(IntegerType))) -> false,
- Seq(
- StructField("f1", MapType(keyType = LongType, valueType = StringType)),
- StructField("f2", ArrayType(DoubleType))) -> false).foreach { case
(schema, expected) =>
+ val schemaDDLs =
+ Seq("f1 int, f2 boolean", "f1 int, f2 array<int>", "f1 map<long,
string>, f2 array<double>")
+ .map(s => StructType.fromDDL(s))
+
+ // Arrays support for iceberg compat native and for Parquet V1
+ val cometScanExecSupported =
+ if
(sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) &&
this
+ .isInstanceOf[ParquetReadV1Suite])
+ Seq(true, true, false)
+ else Seq(true, false, false)
+
+ val cometBatchScanExecSupported = Seq(true, false, false)
+
+ schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) =>
assert(CometScanExec.isSchemaSupported(StructType(schema)) == expected)
+ }
+
+ schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema,
expected) =>
assert(CometBatchScanExec.isSchemaSupported(StructType(schema)) ==
expected)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]