This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 17a36bcfe feat: Add experimental auto mode for `COMET_PARQUET_SCAN_IMPL` (#1747) 17a36bcfe is described below commit 17a36bcfecd401d43df11f276bfd6b9259a9fa5d Author: Andy Grove <agr...@apache.org> AuthorDate: Fri Jun 13 13:42:22 2025 -0600 feat: Add experimental auto mode for `COMET_PARQUET_SCAN_IMPL` (#1747) --- .github/workflows/spark_sql_test_native_auto.yml | 71 ++++++++++++++ .../main/scala/org/apache/comet/CometConf.scala | 10 +- docs/source/user-guide/compatibility.md | 11 +++ docs/source/user-guide/configs.md | 2 +- docs/templates/compatibility-template.md | 11 +++ .../org/apache/comet/rules/CometScanRule.scala | 61 +++++++++++- .../org/apache/comet/CometExpressionSuite.scala | 107 ++++++++++++--------- .../org/apache/comet/exec/CometExecSuite.scala | 14 ++- .../scala/org/apache/spark/sql/CometTestBase.scala | 4 + 9 files changed, 233 insertions(+), 58 deletions(-) diff --git a/.github/workflows/spark_sql_test_native_auto.yml b/.github/workflows/spark_sql_test_native_auto.yml new file mode 100644 index 000000000..bc2c278b6 --- /dev/null +++ b/.github/workflows/spark_sql_test_native_auto.yml @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Spark SQL Tests (native_auto) + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +on: + # manual trigger + # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow + workflow_dispatch: + +env: + RUST_VERSION: stable + +jobs: + spark-sql-catalyst-native-auto: + strategy: + matrix: + os: [ubuntu-24.04] + java-version: [11] + spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.5'}] + module: + - {name: "catalyst", args1: "catalyst/test", args2: ""} + - {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest} + - {name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"} + - {name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"} + - {name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} + - {name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} + - {name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} + fail-fast: false + name: spark-sql-native-auto-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.java-version }} + runs-on: ${{ matrix.os }} + container: + image: amd64/rust + steps: + - uses: actions/checkout@v4 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java-version }} + - name: Setup Spark + uses: ./.github/actions/setup-spark-builder + with: + spark-version: ${{ matrix.spark-version.full }} + spark-short-version: ${{ matrix.spark-version.short }} + - name: Run Spark tests + run: | + cd apache-spark + rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups + ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true COMET_PARQUET_SCAN_IMPL=auto build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}" + env: + LC_ALL: "C.UTF-8" + diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 317303eb7..5736a1447 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -86,6 +86,7 @@ object CometConf extends ShimCometConf { val SCAN_NATIVE_COMET = "native_comet" val SCAN_NATIVE_DATAFUSION = "native_datafusion" val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat" + val SCAN_AUTO = "auto" val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl") .doc( @@ -95,11 +96,12 @@ object CometConf extends ShimCometConf { "parquet file reader and native column decoding. Supports simple types only " + s"'$SCAN_NATIVE_DATAFUSION' is a fully native implementation of scan based on DataFusion" + s"'$SCAN_NATIVE_ICEBERG_COMPAT' is a native implementation that exposes apis to read " + - "parquet columns natively.") + s"parquet columns natively. $SCAN_AUTO chooses the best scan.") .internal() .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) + .checkValues( + Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO)) .createWithDefault(sys.env .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET) .toLowerCase(Locale.ROOT)) @@ -587,8 +589,8 @@ object CometConf extends ShimCometConf { val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.scan.allowIncompatible") .doc( - "Comet is not currently fully compatible with Spark for all datatypes. " + - s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") + "Some Comet scan implementations are not currently fully compatible with Spark for " + + s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 39cd3a058..961209075 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -50,6 +50,8 @@ implementation: The new scans currently have the following limitations: +Issues common to both `native_datafusion` and `native_iceberg_compat`: + - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned @@ -58,12 +60,21 @@ types (regardless of the logical type). This behavior can be disabled by setting `spark.comet.scan.allowIncompatible=true`. - There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more information. +- Reading maps containing complex types can result in errors or incorrect results [#1754] +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. - Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. +Issues specific to `native_datafusion`: + +- Bucketed scans are not supported +- No support for row indexes + [#1545]: https://github.com/apache/datafusion-comet/issues/1545 [#1542]: https://github.com/apache/datafusion-comet/issues/1542 +[#1754]: https://github.com/apache/datafusion-comet/issues/1754 +[#1758]: https://github.com/apache/datafusion-comet/issues/1758 [Comet Tuning Guide]: tuning.md ## ANSI mode diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 517ce960c..f27830320 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -83,7 +83,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | diff --git a/docs/templates/compatibility-template.md b/docs/templates/compatibility-template.md index e304d933f..ebb0431e2 100644 --- a/docs/templates/compatibility-template.md +++ b/docs/templates/compatibility-template.md @@ -50,6 +50,8 @@ implementation: The new scans currently have the following limitations: +Issues common to both `native_datafusion` and `native_iceberg_compat`: + - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned @@ -58,12 +60,21 @@ The new scans currently have the following limitations: `spark.comet.scan.allowIncompatible=true`. - There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more information. +- Reading maps containing complex types can result in errors or incorrect results [#1754] +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. - Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. +Issues specific to `native_datafusion`: + +- Bucketed scans are not supported +- No support for row indexes + [#1545]: https://github.com/apache/datafusion-comet/issues/1545 [#1542]: https://github.com/apache/datafusion-comet/issues/1542 +[#1754]: https://github.com/apache/datafusion-comet/issues/1754 +[#1758]: https://github.com/apache/datafusion-comet/issues/1758 [Comet Tuning Guide]: tuning.md ## ANSI mode diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index a33c6a2ad..483791809 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -105,8 +105,14 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { return withInfos(scanExec, fallbackReasons.toSet) } - val scanImpl = COMET_NATIVE_SCAN_IMPL.get() - if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { + var scanImpl = COMET_NATIVE_SCAN_IMPL.get() + + // if scan is auto then pick the best available scan + if (scanImpl == SCAN_AUTO) { + scanImpl = selectScan(scanExec, r.partitionSchema) + } + + if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { fallbackReasons += s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled" return withInfos(scanExec, fallbackReasons.toSet) @@ -251,6 +257,57 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } } + private def selectScan(scanExec: FileSourceScanExec, partitionSchema: StructType): String = { + // TODO these checks are not yet exhaustive. For example, native_iceberg_compat does + // not support reading from S3 + + val fallbackReasons = new ListBuffer[String]() + + val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT) + val schemaSupported = + typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) + val partitionSchemaSupported = + typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) + + def isComplexType(dt: DataType): Boolean = dt match { + case _: StructType | _: ArrayType | _: MapType => true + case _ => false + } + + def hasMapsContainingStructs(dataType: DataType): Boolean = { + dataType match { + case s: StructType => s.exists(field => hasMapsContainingStructs(field.dataType)) + case a: ArrayType => hasMapsContainingStructs(a.elementType) + case m: MapType => isComplexType(m.keyType) || isComplexType(m.valueType) + case _ => false + } + } + + val knownIssues = + scanExec.requiredSchema.exists(field => hasMapsContainingStructs(field.dataType)) || + partitionSchema.exists(field => hasMapsContainingStructs(field.dataType)) + + if (knownIssues) { + fallbackReasons += "There are known issues with maps containing structs when using " + + s"$SCAN_NATIVE_ICEBERG_COMPAT" + } + + val cometExecEnabled = COMET_EXEC_ENABLED.get() + if (!cometExecEnabled) { + fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true" + } + + if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues) { + logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT") + SCAN_NATIVE_ICEBERG_COMPAT + } else { + logInfo( + s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " + + s"${fallbackReasons.mkString(", ")}") + SCAN_NATIVE_COMET + } + } + } case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5a0c51eef..c5451e27f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -26,6 +26,9 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.Random +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps @@ -44,6 +47,15 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + test("compare true/false to negative zero") { Seq(false, true).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { @@ -193,11 +205,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") { - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - } - withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") + withParquetTable(path.toString, "tbl") { + checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") + } } } } @@ -1372,7 +1384,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { -128, 128, randomSize = 100) - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") { + // this test requires native_comet scan due to unsigned u8/u16 issue + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { withParquetTable(path.toString, "tbl") { for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 15, -16, 16, null)) { @@ -1429,7 +1442,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "hex.parquet") - withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") { + // this test requires native_comet scan due to unsigned u8/u16 issue + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) withParquetTable(path.toString, "tbl") { checkSparkAnswerAndOperator( @@ -2639,46 +2653,47 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("test integral divide") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path1 = new Path(dir.toURI.toString, "test1.parquet") - val path2 = new Path(dir.toURI.toString, "test2.parquet") - makeParquetFileAllTypes( - path1, - dictionaryEnabled = dictionaryEnabled, - 0, - 0, - randomSize = 10000) - makeParquetFileAllTypes( - path2, - dictionaryEnabled = dictionaryEnabled, - 0, - 0, - randomSize = 10000) - withParquetTable(path1.toString, "tbl1") { - withParquetTable(path2.toString, "tbl2") { - checkSparkAnswerAndOperator(""" - |select - | t1._2 div t2._2, div(t1._2, t2._2), - | t1._3 div t2._3, div(t1._3, t2._3), - | t1._4 div t2._4, div(t1._4, t2._4), - | t1._5 div t2._5, div(t1._5, t2._5), - | t1._9 div t2._9, div(t1._9, t2._9), - | t1._10 div t2._10, div(t1._10, t2._10), - | t1._11 div t2._11, div(t1._11, t2._11) - | from tbl1 t1 join tbl2 t2 on t1._id = t2._id - | order by t1._id""".stripMargin) - - checkSparkAnswerAndOperator(""" - |select - | t1._12 div t2._12, div(t1._12, t2._12), - | t1._15 div t2._15, div(t1._15, t2._15), - | t1._16 div t2._16, div(t1._16, t2._16), - | t1._17 div t2._17, div(t1._17, t2._17) - | from tbl1 t1 join tbl2 t2 on t1._id = t2._id - | order by t1._id""".stripMargin) + // this test requires native_comet scan due to unsigned u8/u16 issue + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path1 = new Path(dir.toURI.toString, "test1.parquet") + val path2 = new Path(dir.toURI.toString, "test2.parquet") + makeParquetFileAllTypes( + path1, + dictionaryEnabled = dictionaryEnabled, + 0, + 0, + randomSize = 10000) + makeParquetFileAllTypes( + path2, + dictionaryEnabled = dictionaryEnabled, + 0, + 0, + randomSize = 10000) + withParquetTable(path1.toString, "tbl1") { + withParquetTable(path2.toString, "tbl2") { + checkSparkAnswerAndOperator(""" + |select + | t1._2 div t2._2, div(t1._2, t2._2), + | t1._3 div t2._3, div(t1._3, t2._3), + | t1._4 div t2._4, div(t1._4, t2._4), + | t1._5 div t2._5, div(t1._5, t2._5), + | t1._9 div t2._9, div(t1._9, t2._9), + | t1._10 div t2._10, div(t1._10, t2._10), + | t1._11 div t2._11, div(t1._11, t2._11) + | from tbl1 t1 join tbl2 t2 on t1._id = t2._id + | order by t1._id""".stripMargin) + + checkSparkAnswerAndOperator(""" + |select + | t1._12 div t2._12, div(t1._12, t2._12), + | t1._15 div t2._15, div(t1._15, t2._15), + | t1._16 div t2._16, div(t1._16, t2._16), + | t1._17 div t2._17, div(t1._17, t2._17) + | from tbl1 t1 join tbl2 t2 on t1._id = t2._id + | order by t1._id""".stripMargin) + } } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index d6e887b75..5743e6a34 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -59,7 +59,9 @@ class CometExecSuite extends CometTestBase { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { super.test(testName, testTags: _*) { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { testFun } } @@ -544,10 +546,12 @@ class CometExecSuite extends CometTestBase { } test("Comet native metrics: scan") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) - - withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + // TODO: update this test to work with native_iceberg_compat/auto, + // scan is set to native_comet for now as a workaround + // https://github.com/apache/datafusion-comet/issues/1882 + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { withTempDir { dir => val path = new Path(dir.toURI.toString, "native-scan.parquet") makeParquetFileAllTypes(path, dictionaryEnabled = true, 10000) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index fa3a28438..b5d4d3512 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -80,6 +80,10 @@ abstract class CometTestBase conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") + // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented + // with the assumption that this is the default and would need updating if we + // change the default + conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, CometConf.SCAN_NATIVE_COMET) conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org