This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b344def1 fix: Support `auto` scan mode with Spark 4.0.0 (#1975)
1b344def1 is described below

commit 1b344def1be595f8de5d86994dddb029ef68f20a
Author: Andy Grove <agr...@apache.org>
AuthorDate: Mon Aug 25 18:12:14 2025 -0600

    fix: Support `auto` scan mode with Spark 4.0.0 (#1975)
---
 .../org/apache/spark/sql/comet/util/Utils.scala    |   9 +-
 .../org/apache/comet/shims/CometTypeShim.scala     |  25 ++++
 .../org/apache/comet/shims/CometTypeShim.scala     |  26 ++++
 dev/diffs/4.0.0.diff                               | 134 ++++++++++++++++++---
 .../org/apache/comet/rules/CometScanRule.scala     |  36 +++---
 5 files changed, 195 insertions(+), 35 deletions(-)

diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala 
b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
index 3041ea2c8..a72208db2 100644
--- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
+++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
@@ -39,9 +39,10 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
+import org.apache.comet.shims.CometTypeShim
 import org.apache.comet.vector.CometVector
 
-object Utils {
+object Utils extends CometTypeShim {
   def getConfPath(confFileName: String): String = {
     sys.env
       .get("COMET_CONF_DIR")
@@ -124,7 +125,8 @@ object Utils {
       case LongType => new ArrowType.Int(8 * 8, true)
       case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
       case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-      case StringType => ArrowType.Utf8.INSTANCE
+      case _: StringType => ArrowType.Utf8.INSTANCE
+      case dt if isStringCollationType(dt) => ArrowType.Utf8.INSTANCE
       case BinaryType => ArrowType.Binary.INSTANCE
       case DecimalType.Fixed(precision, scale) => new 
ArrowType.Decimal(precision, scale, 128)
       case DateType => new ArrowType.Date(DateUnit.DAY)
@@ -138,7 +140,8 @@ object Utils {
       case TimestampNTZType =>
         new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
       case _ =>
-        throw new UnsupportedOperationException(s"Unsupported data type: 
${dt.catalogString}")
+        throw new UnsupportedOperationException(
+          s"Unsupported data type: [${dt.getClass.getName}] 
${dt.catalogString}")
     }
 
   /** Maps field from Spark to Arrow. NOTE: timeZoneId required for 
TimestampType */
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala 
b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
new file mode 100644
index 000000000..14e0881e0
--- /dev/null
+++ b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.comet.shims
+
+import org.apache.spark.sql.types.DataType
+
+trait CometTypeShim {
+    def isStringCollationType(dt: DataType): Boolean = false
+}
\ No newline at end of file
diff --git 
a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala 
b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala
new file mode 100644
index 000000000..a2d64b3ea
--- /dev/null
+++ b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.comet.shims
+
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.DataType
+
+trait CometTypeShim {
+    def isStringCollationType(dt: DataType): Boolean = 
dt.isInstanceOf[StringTypeWithCollation]
+}
\ No newline at end of file
diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff
index 7d2dc790d..4a4b958c4 100644
--- a/dev/diffs/4.0.0.diff
+++ b/dev/diffs/4.0.0.diff
@@ -700,10 +700,10 @@ index 9c529d14221..069b7c5adeb 100644
          }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
 new file mode 100644
-index 00000000000..5eb3fa17ca8
+index 00000000000..5691536c114
 --- /dev/null
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
-@@ -0,0 +1,43 @@
+@@ -0,0 +1,45 @@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
@@ -732,6 +732,8 @@ index 00000000000..5eb3fa17ca8
 + * Tests with this tag will be ignored when Comet is enabled (e.g., via 
`ENABLE_COMET`).
 + */
 +case class IgnoreComet(reason: String) extends Tag("DisableComet")
++case class IgnoreCometNativeIcebergCompat(reason: String) extends 
Tag("DisableComet")
++case class IgnoreCometNativeDataFusion(reason: String) extends 
Tag("DisableComet")
 +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
 +
 +/**
@@ -1277,11 +1279,26 @@ index 2e33f6505ab..e1e93ab3bad 100644
      }
  
      withTable("t1", "t2") {
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+index fee375db10a..8c2c24e2c5f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
+ import org.apache.spark.types.variant._
+ import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+ 
+-class VariantShreddingSuite extends QueryTest with SharedSparkSession with 
ParquetTest {
++class VariantShreddingSuite extends QueryTest with SharedSparkSession with 
ParquetTest
++    // TODO enable tests once 
https://github.com/apache/datafusion-comet/issues/2209 is fixed
++    with IgnoreCometSuite {
+   def parseJson(s: String): VariantVal = {
+     val v = VariantBuilder.parseJson(s, false)
+     new VariantVal(v.getValue, v.getMetadata)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-index 11e9547dfc5..df5678c8d82 100644
+index 11e9547dfc5..be9ae40ab3d 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-@@ -20,7 +20,7 @@ package org.apache.spark.sql.collation
+@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
  import scala.jdk.CollectionConverters.MapHasAsJava
  
  import org.apache.spark.SparkException
@@ -1290,7 +1307,21 @@ index 11e9547dfc5..df5678c8d82 100644
  import org.apache.spark.sql.catalyst.ExtendedAnalysisException
  import org.apache.spark.sql.catalyst.expressions._
  import org.apache.spark.sql.catalyst.util.CollationFactory
-@@ -1505,7 +1505,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
++import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec, 
CometHashJoinExec, CometSortMergeJoinExec}
+ import org.apache.spark.sql.connector.{DatasourceV2SQLBase, 
FakeV2ProviderWithCustomSchema}
+ import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+@@ -55,7 +56,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+     assert(
+       collectFirst(queryPlan) {
+         case _: SortMergeJoinExec => assert(isSortMergeForced)
++        case _: CometSortMergeJoinExec => assert(isSortMergeForced)
+         case _: HashJoin => assert(!isSortMergeForced)
++        case _: CometHashJoinExec | _: CometBroadcastHashJoinExec => 
assert(!isSortMergeForced)
+       }.nonEmpty
+     )
+   }
+@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
      }
    }
  
@@ -1301,7 +1332,23 @@ index 11e9547dfc5..df5678c8d82 100644
      val t1 = "T_1"
      val t2 = "T_2"
  
-@@ -1815,7 +1817,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+             } else {
+               assert(!collectFirst(queryPlan) {
+                 case b: BroadcastHashJoinExec => b.leftKeys.head
++                case b: CometBroadcastHashJoinExec => b.leftKeys.head
+               }.head.isInstanceOf[ArrayTransform])
+             }
+           }
+@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+             } else {
+               assert(!collectFirst(queryPlan) {
+                 case b: BroadcastHashJoinExec => b.leftKeys.head
++                case b: CometBroadcastHashJoinExec => b.leftKeys.head
+               }.head.isInstanceOf[ArrayTransform])
+             }
+           }
+@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
      }
    }
  
@@ -2636,10 +2683,23 @@ index 22839d3f0d2..7e66d100e90 100644
        checkAnswer(
          // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index bba71f1c48d..4f33ce4b3f2 100644
+index bba71f1c48d..38c60ee2584 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -1060,7 +1060,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+         Seq(Some("A"), Some("A"), None).toDF().repartition(1)
+           .write.parquet(path.getAbsolutePath)
+         val df = spark.read.parquet(path.getAbsolutePath)
+-        checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
++        // Similar to Spark's vectorized reader, Comet doesn't do row-level 
filtering but relies
++        // on Spark to apply the data filters after columnar batches are 
returned
++        if (!isCometEnabled) {
++          checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
++        }
+       }
+     }
+   }
+@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
          checkAnswer(readParquet(schema2, path), df)
        }
  
@@ -2649,7 +2709,7 @@ index bba71f1c48d..4f33ce4b3f2 100644
          val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
          checkAnswer(readParquet(schema1, path), df)
          val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1084,7 +1085,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
        val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
        df.write.parquet(path.toString)
  
@@ -3330,27 +3390,34 @@ index 86c4e49f6f6..2e639e5f38d 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index f0f3f94b811..486a436afb2 100644
+index f0f3f94b811..d64e4e54e22 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
+@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
+ import scala.language.implicitConversions
+ import scala.util.control.NonFatal
+ 
++import org.apache.comet.CometConf
+ import org.apache.hadoop.fs.Path
+ import org.scalactic.source.Position
+ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
  import org.scalatest.concurrent.Eventually
  
  import org.apache.spark.SparkFunSuite
 -import org.apache.spark.sql.{AnalysisException, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, 
IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, 
IgnoreCometNativeScan, Row}
  import org.apache.spark.sql.catalyst.FunctionIdentifier
  import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
  import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
-@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
+@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
  import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  import org.apache.spark.sql.catalyst.util._
  import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, 
ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
-+import org.apache.spark.sql.comet._
++import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
  import org.apache.spark.sql.execution.FilterExec
  import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
  import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -128,7 +129,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
+@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
          }
        }
      } else {
@@ -3358,12 +3425,29 @@ index f0f3f94b811..486a436afb2 100644
 +      if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
 +        ignore(testName + " (disabled when Comet is on)", testTags: 
_*)(testFun)
 +      } else {
-+        super.test(testName, testTags: _*)(testFun)
++        val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++        val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++          cometScanImpl == CometConf.SCAN_AUTO
++        val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
++          cometScanImpl == CometConf.SCAN_AUTO
++        if (isCometEnabled && isNativeIcebergCompat &&
++          testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++          ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", 
testTags: _*)(testFun)
++        } else if (isCometEnabled && isNativeDataFusion &&
++          testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++          ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
++        } else if (isCometEnabled && (isNativeDataFusion || 
isNativeIcebergCompat) &&
++          testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++          ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
++            testTags: _*)(testFun)
++        } else {
++          super.test(testName, testTags: _*)(testFun)
++        }
 +      }
      }
    }
  
-@@ -248,8 +253,33 @@ private[sql] trait SQLTestUtilsBase
+@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase
      override protected def converter: ColumnNodeToExpressionConverter = 
self.spark.converter
    }
  
@@ -3397,7 +3481,7 @@ index f0f3f94b811..486a436afb2 100644
      super.withSQLConf(pairs: _*)(f)
    }
  
-@@ -451,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase
      val schema = df.schema
      val withoutFilters = df.queryExecution.executedPlan.transform {
        case FilterExec(_, child) => child
@@ -3509,6 +3593,20 @@ index 4b27082e188..09f591dfed3 100644
        Utils.withContextClassLoader(Utils.getSparkClassLoader) {
          withUserDefinedFunction(udfInfo.funcName -> false) {
            val sparkClassLoader = Thread.currentThread().getContextClassLoader
+diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+index cc7bb193731..06555d48da7 100644
+--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+@@ -818,7 +818,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton 
with BeforeAndAfter
+     }
+   }
+ 
+-  test("SPARK-30201 HiveOutputWriter standardOI should use 
ObjectInspectorCopyOption.DEFAULT") {
++  test("SPARK-30201 HiveOutputWriter standardOI should use 
ObjectInspectorCopyOption.DEFAULT",
++      IgnoreComet("Comet does not support reading non UTF-8 strings")) {
+     withTable("t1", "t2") {
+       withTempDir { dir =>
+         val file = new File(dir, "test.hex")
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 index b67370f6eb9..746b3974b29 100644
 --- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index aad74064f..e1511b289 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -34,16 +34,17 @@ import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
-import org.apache.comet.{CometConf, CometSparkSessionExtensions, 
DataTypeSupport}
+import org.apache.comet.{CometConf, DataTypeSupport}
 import org.apache.comet.CometConf._
 import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, 
isCometScanEnabled, withInfo, withInfos}
 import org.apache.comet.DataTypeSupport.isComplexType
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
+import org.apache.comet.shims.CometTypeShim
 
 /**
  * Spark physical optimizer rule for replacing Spark scans with Comet scans.
  */
-case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
+case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with 
CometTypeShim {
 
   private lazy val showTransformations = 
CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
 
@@ -286,10 +287,6 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
     val fallbackReasons = new ListBuffer[String]()
 
-    if (CometSparkSessionExtensions.isSpark40Plus) {
-      fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT  is not implemented for 
Spark 4.0.0"
-    }
-
     // native_iceberg_compat only supports local filesystem and S3
     if (!scanExec.relation.inputFiles
         .forall(path => path.startsWith("file://") || 
path.startsWith("s3a://"))) {
@@ -302,21 +299,28 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
     val partitionSchemaSupported =
       typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
 
-    def hasMapsContainingStructs(dataType: DataType): Boolean = {
+    def hasUnsupportedType(dataType: DataType): Boolean = {
       dataType match {
-        case s: StructType => s.exists(field => 
hasMapsContainingStructs(field.dataType))
-        case a: ArrayType => hasMapsContainingStructs(a.elementType)
-        case m: MapType => isComplexType(m.keyType) || 
isComplexType(m.valueType)
+        case s: StructType => s.exists(field => 
hasUnsupportedType(field.dataType))
+        case a: ArrayType => hasUnsupportedType(a.elementType)
+        case m: MapType =>
+          // maps containing complex types are not supported
+          isComplexType(m.keyType) || isComplexType(m.valueType) ||
+          hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType)
+        case dt => isStringCollationType(dt)
+        case _: StringType =>
+          // we only support `case object StringType` and not other instances 
of `class StringType`
+          dataType != StringType
         case _ => false
       }
     }
 
     val knownIssues =
-      scanExec.requiredSchema.exists(field => 
hasMapsContainingStructs(field.dataType)) ||
-        partitionSchema.exists(field => 
hasMapsContainingStructs(field.dataType))
+      scanExec.requiredSchema.exists(field => 
hasUnsupportedType(field.dataType)) ||
+        partitionSchema.exists(field => hasUnsupportedType(field.dataType))
 
     if (knownIssues) {
-      fallbackReasons += "There are known issues with maps containing structs 
when using " +
+      fallbackReasons += "Schema contains data types that are not supported by 
" +
         s"$SCAN_NATIVE_ICEBERG_COMPAT"
     }
 
@@ -339,7 +343,7 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
 }
 
-case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {
+case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with 
CometTypeShim {
 
   // this class is intended to be used with a specific scan impl
   assert(scanImpl != CometConf.SCAN_AUTO)
@@ -357,6 +361,10 @@ case class CometScanTypeChecker(scanImpl: String) extends 
DataTypeSupport {
         false
       case _: StructType | _: ArrayType | _: MapType if scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
         false
+      case dt if isStringCollationType(dt) =>
+        // we don't need specific support for collation in scans, but this
+        // is a convenient place to force the whole query to fall back to 
Spark for now
+        false
       case s: StructType if s.fields.isEmpty =>
         false
       case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to