This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 1b344def1 fix: Support `auto` scan mode with Spark 4.0.0 (#1975) 1b344def1 is described below commit 1b344def1be595f8de5d86994dddb029ef68f20a Author: Andy Grove <agr...@apache.org> AuthorDate: Mon Aug 25 18:12:14 2025 -0600 fix: Support `auto` scan mode with Spark 4.0.0 (#1975) --- .../org/apache/spark/sql/comet/util/Utils.scala | 9 +- .../org/apache/comet/shims/CometTypeShim.scala | 25 ++++ .../org/apache/comet/shims/CometTypeShim.scala | 26 ++++ dev/diffs/4.0.0.diff | 134 ++++++++++++++++++--- .../org/apache/comet/rules/CometScanRule.scala | 36 +++--- 5 files changed, 195 insertions(+), 35 deletions(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 3041ea2c8..a72208db2 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -39,9 +39,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} +import org.apache.comet.shims.CometTypeShim import org.apache.comet.vector.CometVector -object Utils { +object Utils extends CometTypeShim { def getConfPath(confFileName: String): String = { sys.env .get("COMET_CONF_DIR") @@ -124,7 +125,8 @@ object Utils { case LongType => new ArrowType.Int(8 * 8, true) case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case StringType => ArrowType.Utf8.INSTANCE + case _: StringType => ArrowType.Utf8.INSTANCE + case dt if isStringCollationType(dt) => ArrowType.Utf8.INSTANCE case BinaryType => ArrowType.Binary.INSTANCE case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale, 128) case DateType => new ArrowType.Date(DateUnit.DAY) @@ -138,7 +140,8 @@ object Utils { case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) case _ => - throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + throw new UnsupportedOperationException( + s"Unsupported data type: [${dt.getClass.getName}] ${dt.catalogString}") } /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala new file mode 100644 index 000000000..14e0881e0 --- /dev/null +++ b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.comet.shims + +import org.apache.spark.sql.types.DataType + +trait CometTypeShim { + def isStringCollationType(dt: DataType): Boolean = false +} \ No newline at end of file diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala new file mode 100644 index 000000000..a2d64b3ea --- /dev/null +++ b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.comet.shims + +import org.apache.spark.sql.internal.types.StringTypeWithCollation +import org.apache.spark.sql.types.DataType + +trait CometTypeShim { + def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation] +} \ No newline at end of file diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 7d2dc790d..4a4b958c4 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala new file mode 100644 -index 00000000000..5eb3fa17ca8 +index 00000000000..5691536c114 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,43 @@ +@@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with @@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8 + * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). + */ +case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") + +/** @@ -1277,11 +1279,26 @@ index 2e33f6505ab..e1e93ab3bad 100644 } withTable("t1", "t2") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +index fee375db10a..8c2c24e2c5f 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala +@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ + import org.apache.spark.types.variant._ + import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { ++class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest ++ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed ++ with IgnoreCometSuite { + def parseJson(s: String): VariantVal = { + val v = VariantBuilder.parseJson(s, false) + new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -index 11e9547dfc5..df5678c8d82 100644 +index 11e9547dfc5..be9ae40ab3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation +@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.spark.SparkException @@ -1290,7 +1307,21 @@ index 11e9547dfc5..df5678c8d82 100644 import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CollationFactory -@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { ++import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec} + import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema} + import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + assert( + collectFirst(queryPlan) { + case _: SortMergeJoinExec => assert(isSortMergeForced) ++ case _: CometSortMergeJoinExec => assert(isSortMergeForced) + case _: HashJoin => assert(!isSortMergeForced) ++ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => assert(!isSortMergeForced) + }.nonEmpty + ) + } +@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -1301,7 +1332,23 @@ index 11e9547dfc5..df5678c8d82 100644 val t1 = "T_1" val t2 = "T_2" -@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { +@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { + } else { + assert(!collectFirst(queryPlan) { + case b: BroadcastHashJoinExec => b.leftKeys.head ++ case b: CometBroadcastHashJoinExec => b.leftKeys.head + }.head.isInstanceOf[ArrayTransform]) + } + } +@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } @@ -2636,10 +2683,23 @@ index 22839d3f0d2..7e66d100e90 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..4f33ce4b3f2 100644 +index bba71f1c48d..38c60ee2584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + Seq(Some("A"), Some("A"), None).toDF().repartition(1) + .write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) +- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ // Similar to Spark's vectorized reader, Comet doesn't do row-level filtering but relies ++ // on Spark to apply the data filters after columnar batches are returned ++ if (!isCometEnabled) { ++ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) ++ } + } + } + } +@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2649,7 +2709,7 @@ index bba71f1c48d..4f33ce4b3f2 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3330,27 +3390,34 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..486a436afb2 100644 +index f0f3f94b811..d64e4e54e22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag} +@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ + import scala.language.implicitConversions + import scala.util.control.NonFatal + ++import org.apache.comet.CometConf + import org.apache.hadoop.fs.Path + import org.scalactic.source.Position + import org.scalatest.{BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE -@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase +@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits} -+import org.apache.spark.sql.comet._ ++import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec} import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with +@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } } else { @@ -3358,12 +3425,29 @@ index f0f3f94b811..486a436afb2 100644 + if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { + ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) + } else { -+ super.test(testName, testTags: _*)(testFun) ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isCometEnabled && isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ } else if (isCometEnabled && isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } + } } } -@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase +@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3397,7 +3481,7 @@ index f0f3f94b811..486a436afb2 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child @@ -3509,6 +3593,20 @@ index 4b27082e188..09f591dfed3 100644 Utils.withContextClassLoader(Utils.getSparkClassLoader) { withUserDefinedFunction(udfInfo.funcName -> false) { val sparkClassLoader = Thread.currentThread().getContextClassLoader +diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +index cc7bb193731..06555d48da7 100644 +--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter + } + } + +- test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT") { ++ test("SPARK-30201 HiveOutputWriter standardOI should use ObjectInspectorCopyOption.DEFAULT", ++ IgnoreComet("Comet does not support reading non UTF-8 strings")) { + withTable("t1", "t2") { + withTempDir { dir => + val file = new File(dir, "test.hex") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index b67370f6eb9..746b3974b29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index aad74064f..e1511b289 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -34,16 +34,17 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometSparkSessionExtensions, DataTypeSupport} +import org.apache.comet.{CometConf, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.parquet.{CometParquetScan, SupportsComet} +import org.apache.comet.shims.CometTypeShim /** * Spark physical optimizer rule for replacing Spark scans with Comet scans. */ -case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { +case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim { private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get() @@ -286,10 +287,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { val fallbackReasons = new ListBuffer[String]() - if (CometSparkSessionExtensions.isSpark40Plus) { - fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT is not implemented for Spark 4.0.0" - } - // native_iceberg_compat only supports local filesystem and S3 if (!scanExec.relation.inputFiles .forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) { @@ -302,21 +299,28 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { val partitionSchemaSupported = typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - def hasMapsContainingStructs(dataType: DataType): Boolean = { + def hasUnsupportedType(dataType: DataType): Boolean = { dataType match { - case s: StructType => s.exists(field => hasMapsContainingStructs(field.dataType)) - case a: ArrayType => hasMapsContainingStructs(a.elementType) - case m: MapType => isComplexType(m.keyType) || isComplexType(m.valueType) + case s: StructType => s.exists(field => hasUnsupportedType(field.dataType)) + case a: ArrayType => hasUnsupportedType(a.elementType) + case m: MapType => + // maps containing complex types are not supported + isComplexType(m.keyType) || isComplexType(m.valueType) || + hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType) + case dt => isStringCollationType(dt) + case _: StringType => + // we only support `case object StringType` and not other instances of `class StringType` + dataType != StringType case _ => false } } val knownIssues = - scanExec.requiredSchema.exists(field => hasMapsContainingStructs(field.dataType)) || - partitionSchema.exists(field => hasMapsContainingStructs(field.dataType)) + scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) || + partitionSchema.exists(field => hasUnsupportedType(field.dataType)) if (knownIssues) { - fallbackReasons += "There are known issues with maps containing structs when using " + + fallbackReasons += "Schema contains data types that are not supported by " + s"$SCAN_NATIVE_ICEBERG_COMPAT" } @@ -339,7 +343,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } -case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { +case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim { // this class is intended to be used with a specific scan impl assert(scanImpl != CometConf.SCAN_AUTO) @@ -357,6 +361,10 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { false case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => false + case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now + false case s: StructType if s.fields.isEmpty => false case _ => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org