This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new d3b2007ba fix: enable native_datafusion Spark SQL tests for #3320, 
#3401, #3719 (#3718)
d3b2007ba is described below

commit d3b2007baeb54497d4e4ed0ba5ef0607bb4c91c8
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 18 07:53:10 2026 -0600

    fix: enable native_datafusion Spark SQL tests for #3320, #3401, #3719 
(#3718)
---
 dev/diffs/3.5.8.diff                               | 429 ++++++++++++++-------
 docs/source/contributor-guide/parquet_scans.md     |   7 +-
 native/Cargo.lock                                  |   8 +-
 native/core/src/errors.rs                          |  52 ++-
 native/spark-expr/src/error.rs                     |  24 ++
 .../sql/comet/shims/ShimSparkErrorConverter.scala  |   6 +
 .../sql/comet/shims/ShimSparkErrorConverter.scala  |   6 +
 .../sql/comet/shims/ShimSparkErrorConverter.scala  |   6 +
 8 files changed, 396 insertions(+), 142 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 138e729f9..db495f1e2 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index edd2ad57880..77a975ea48f 100644
+index edd2ad57880..837b95d1ada 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -152,6 +152,8 @@
@@ -93,23 +93,22 @@ index 27ae10b3d59..78e69902dfd 100644
 +  }
  }
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
-index db587dd9868..33802f29253 100644
+index db587dd9868..aac7295a53d 100644
 --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
 +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
 @@ -18,6 +18,7 @@
  package org.apache.spark.sql.execution
  
  import org.apache.spark.annotation.DeveloperApi
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
++import org.apache.spark.sql.comet.CometScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
  import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
  import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
-@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
      // dump the file scan metadata (e.g file path) to event log
      val metadata = plan match {
        case fileScan: FileSourceScanExec => fileScan.metadata
 +      case cometScan: CometScanExec => cometScan.metadata
-+      case nativeScan: CometNativeScanExec => nativeScan.metadata
        case _ => Map[String, String]()
      }
      new SparkPlanInfo(
@@ -239,6 +238,20 @@ index e5494726695..00937f025c2 100644
    }
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+index 9e8d77c53f3..855e3ada7d1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSparkSession {
+     }
+   }
+ 
+-  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
++  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withTempPath { dir =>
+       val data = sparkContext.parallelize(0 to 10).toDF("id")
+       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 index 6f3090d8908..c08a60fb0c2 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -397,14 +410,14 @@ index c4fb4fa943c..a04b23870a8 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..4acdf7e9cfb 100644
+index f33432ddb6f..42eb9fd1cb7 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
  import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Expression}
  import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
  import org.apache.spark.sql.catalyst.plans.ExistenceJoin
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
++import org.apache.spark.sql.comet.CometScanExec
  import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, 
InMemoryTableWithV2FilterCatalog}
  import org.apache.spark.sql.execution._
  import org.apache.spark.sql.execution.adaptive._
@@ -448,22 +461,40 @@ index f33432ddb6f..4acdf7e9cfb 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
-@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+    * Check the static scan metrics with and without DPP
+    */
+   test("static scan metrics",
+-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++    DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
 +              case s: CometScanExec =>
-+                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
-+              case s: CometNativeScanExec =>
 +                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
                case _ => false
              }
            assert(scanOption.isDefined)
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..fea1149b67d 100644
+index a206e97c353..79813d8e259 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+     }
+   }
+ 
+-  test("explain formatted - check presence of subquery in case of DPP") {
++  test("explain formatted - check presence of subquery in case of DPP",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
+     withTable("df1", "df2") {
+       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+         SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
      }
    }
  
@@ -473,7 +504,7 @@ index a206e97c353..fea1149b67d 100644
      withTempDir { dir =>
        Seq("parquet", "orc", "csv", "json").foreach { fmt =>
          val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
    }
  }
  
@@ -485,10 +516,18 @@ index a206e97c353..fea1149b67d 100644
  
    test("SPARK-35884: Explain Formatted") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..ca79ad8b6d9 100644
+index 93275487f29..510e3087e0f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
+ 
+ import scala.collection.mutable
+ 
++import org.apache.comet.CometConf
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{LocalFileSystem, Path}
+ 
+@@ -33,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
  import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
  import org.apache.spark.sql.catalyst.plans.logical.Filter
  import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -496,7 +535,16 @@ index 93275487f29..ca79ad8b6d9 100644
  import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
+               case "" => "_LEGACY_ERROR_TEMP_2062"
+               case _ => "_LEGACY_ERROR_TEMP_2055"
+             }
++            // native_datafusion Parquet scan cannot throw a 
SparkFileNotFoundException
++            assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+             checkErrorMatchPVals(
+               exception = intercept[SparkException] {
+                 testIgnoreMissingFiles(options)
+@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
    }
  
    Seq("parquet", "orc").foreach { format =>
@@ -506,7 +554,7 @@ index 93275487f29..ca79ad8b6d9 100644
        withTempDir { dir =>
          val tableName = s"spark_25132_${format}_native"
          val tableDir = dir.getCanonicalPath + s"/$tableName"
-@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
              assert(bJoinExec.isEmpty)
              val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
                case smJoin: SortMergeJoinExec => smJoin
@@ -514,7 +562,7 @@ index 93275487f29..ca79ad8b6d9 100644
              }
              assert(smJoinExec.nonEmpty)
            }
-@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -522,7 +570,7 @@ index 93275487f29..ca79ad8b6d9 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -530,7 +578,7 @@ index 93275487f29..ca79ad8b6d9 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
            val filters = df.queryExecution.executedPlan.collect {
              case f: FileSourceScanLike => f.dataFilters
              case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -930,73 +978,6 @@ index 3cf2bfd17ab..49728c35c42 100644
      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
        SQLConf.ANSI_ENABLED.key -> "true") {
        withTable("t") {
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index fa1a64460fc..134f0db1fb8 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-@@ -17,6 +17,8 @@
- 
- package org.apache.spark.sql
- 
-+import org.apache.comet.CometConf
-+
- import org.apache.spark.{SPARK_DOC_ROOT, SparkIllegalArgumentException, 
SparkRuntimeException}
- import org.apache.spark.sql.catalyst.expressions.Cast._
- import org.apache.spark.sql.execution.FormattedMode
-@@ -178,29 +180,31 @@ class StringFunctionsSuite extends QueryTest with 
SharedSparkSession {
-   }
- 
-   test("string regex_replace / regex_extract") {
--    val df = Seq(
--      ("100-200", "(\\d+)-(\\d+)", "300"),
--      ("100-200", "(\\d+)-(\\d+)", "400"),
--      ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
-+    withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
-+      val df = Seq(
-+        ("100-200", "(\\d+)-(\\d+)", "300"),
-+        ("100-200", "(\\d+)-(\\d+)", "400"),
-+        ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
- 
--    checkAnswer(
--      df.select(
--        regexp_replace($"a", "(\\d+)", "num"),
--        regexp_replace($"a", $"b", $"c"),
--        regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
--      Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
--        Row("num-num", "400-400", "100") :: Nil)
--
--    // for testing the mutable state of the expression in code gen.
--    // This is a hack way to enable the codegen, thus the codegen is enable 
by default,
--    // it will still use the interpretProjection if projection followed by a 
LocalRelation,
--    // hence we add a filter operator.
--    // See the optimizer rule `ConvertToLocalRelation`
--    checkAnswer(
--      df.filter("isnotnull(a)").selectExpr(
--        "regexp_replace(a, b, c)",
--        "regexp_extract(a, b, 1)"),
--      Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: Nil)
-+      checkAnswer(
-+        df.select(
-+          regexp_replace($"a", "(\\d+)", "num"),
-+          regexp_replace($"a", $"b", $"c"),
-+          regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
-+        Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
-+          Row("num-num", "400-400", "100") :: Nil)
-+
-+      // for testing the mutable state of the expression in code gen.
-+      // This is a hack way to enable the codegen, thus the codegen is enable 
by default,
-+      // it will still use the interpretProjection if projection followed by 
a LocalRelation,
-+      // hence we add a filter operator.
-+      // See the optimizer rule `ConvertToLocalRelation`
-+      checkAnswer(
-+        df.filter("isnotnull(a)").selectExpr(
-+          "regexp_replace(a, b, c)",
-+          "regexp_extract(a, b, 1)"),
-+        Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: 
Nil)
-+    }
-   }
- 
-   test("non-matching optional group") {
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
 index 04702201f82..5ee11f83ecf 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1036,6 +1017,20 @@ index 04702201f82..5ee11f83ecf 100644
        }
        assert(exchanges.size === 1)
      }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+index 9f8e979e3fb..3bc9dab8023 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
+     spark.catalog.dropTempView("tmp_table")
+   }
+ 
+-  test("SPARK-8005 input_file_name") {
++  test("SPARK-8005 input_file_name",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withTempPath { dir =>
+       val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
+       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 index d269290e616..13726a31e07 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1100,18 +1095,31 @@ index d269290e616..13726a31e07 100644
                }
              }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..c4be7eb3731 100644
+index cfc8b2cc845..b7c234e1437 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
+ import scala.collection.mutable.ArrayBuffer
+ 
  import org.apache.spark.SparkConf
- import org.apache.spark.sql.{AnalysisException, QueryTest}
+-import org.apache.spark.sql.{AnalysisException, QueryTest}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest}
  import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
  import org.apache.spark.sql.connector.read.ScanBuilder
  import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
+@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with 
SharedSparkSession {
+     }
+   }
+ 
+-  test("Fallback Parquet V2 to V1") {
++  test("Fallback Parquet V2 to V1",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { 
format =>
+       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+         val commands = ArrayBuffer.empty[(String, LogicalPlan)]
+@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
              val df = spark.read.format(format).load(path.getCanonicalPath)
              checkAnswer(df, inputData.toDF())
              assert(
@@ -1375,6 +1383,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index a1147c16cc8..c7a29496328 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkFileNotFoundException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
+     }
+   }
+ 
+-  test("alter temporary view should follow current storeAnalyzedPlanForView 
config") {
++  test("alter temporary view should follow current storeAnalyzedPlanForView 
config",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314";))
 {
+     withTable("t") {
+       Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+       withView("v1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 index eec396b2e39..bf3f1c769d6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -1969,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 8e88049f51e..6150a556f9b 100644
+index 8e88049f51e..b713ccddfcb 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
@@ -2058,7 +2088,7 @@ index 8e88049f51e..6150a556f9b 100644
      val schema = StructType(Seq(
        StructField("a", IntegerType, nullable = false)
      ))
-@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1952,8 +1968,17 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            val e = intercept[SparkException] {
              sql(s"select a from $tableName where b > 0").collect()
            }
@@ -2066,16 +2096,19 @@ index 8e88049f51e..6150a556f9b 100644
 -            """Found duplicate field(s) "B": [B, b] in case-insensitive 
mode"""))
 +          assert(e.getCause.isInstanceOf[RuntimeException])
 +          val msg = e.getCause.getMessage
-+          // native_datafusion produces a different error message for 
duplicate fields
++          // native_datafusion converts DataFusion's "Unable to get field 
named" error
++          // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b" 
vs "B")
++          // because DataFusion resolves field names case-insensitively
 +          assert(
 +            msg.contains(
 +              """Found duplicate field(s) "B": [B, b] in case-insensitive 
mode""") ||
-+              msg.contains("Unable to get field named"),
++              msg.contains(
++                """Found duplicate field(s) "b": [B, b] in case-insensitive 
mode"""),
 +            s"Unexpected error message: $msg")
          }
  
          withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1984,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2085,7 +2118,7 @@ index 8e88049f51e..6150a556f9b 100644
      // block 1:
      //                      null count  min                                   
    max
      // page-0                         0  0                                    
     99
-@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -2044,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2095,7 +2128,7 @@ index 8e88049f51e..6150a556f9b 100644
      withTempPath { dir =>
        val path = dir.getCanonicalPath
        spark.range(100).selectExpr("id * 2 AS id")
-@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2276,7 +2303,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2108,7 +2141,7 @@ index 8e88049f51e..6150a556f9b 100644
          } else {
            assert(selectedFilters.isEmpty, "There is filter pushed down")
          }
-@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2336,7 +2367,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2122,7 +2155,7 @@ index 8e88049f51e..6150a556f9b 100644
          case _ =>
            throw new AnalysisException("Can not match ParquetTable in the 
query.")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8ed9ef1630e..f312174b182 100644
+index 8ed9ef1630e..a865928c1b2 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 @@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
@@ -2131,7 +2164,7 @@ index 8ed9ef1630e..f312174b182 100644
  
 -  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error") {
 +  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
      val data = (1 to 4).map(i => Tuple1(i.toString))
      val readSchema = StructType(Seq(StructField("_1", 
DataTypes.TimestampType)))
  
@@ -2141,7 +2174,7 @@ index 8ed9ef1630e..f312174b182 100644
  
 -  test("SPARK-35640: int as long should throw schema incompatible error") {
 +  test("SPARK-35640: int as long should throw schema incompatible error",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
      val data = (1 to 4).map(i => Tuple1(i))
      val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
  
@@ -2156,7 +2189,7 @@ index 8ed9ef1630e..f312174b182 100644
        checkAnswer(
          // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index f6472ba3d9d..ce39ebb52e6 100644
+index f6472ba3d9d..7f00caf5063 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
@@ -2165,7 +2198,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
  
 -  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
 +  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
      val data = (1 to 1000).map { i =>
        val ts = new java.sql.Timestamp(i)
        Row(ts)
@@ -2215,7 +2248,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
  
 -  test("row group skipping doesn't overflow when reading into larger type") {
 +  test("row group skipping doesn't overflow when reading into larger type",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
      withTempPath { path =>
        Seq(0).toDF("a").write.parquet(path.toString)
        // The vectorized and non-vectorized readers will produce different 
exceptions, we don't need
@@ -2414,32 +2447,42 @@ index 5cdbdc27b32..307fba16578 100644
        spark.range(10).selectExpr("id", "id % 3 as p")
          .write.partitionBy("p").saveAsTable("testDataForScan")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..b18a5bea944 100644
+index 0ab8691801d..7b81f3a8f6d 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,9 @@
+ 
  package org.apache.spark.sql.execution.python
  
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
  import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, 
BatchEvalPython, Limit, LocalLimit}
 +import org.apache.spark.sql.comet._
  import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+     assert(arrowEvalNodes.size == 2)
+   }
+ 
+-  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1") {
++  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+       withTempPath { f =>
+         spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
-+            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
-+            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            // $"a" is not null and $"a" > 1
@@ -2448,14 +2491,13 @@ index 0ab8691801d..b18a5bea944 100644
 +          val dataFilters = scanNodes.head match {
 +            case scan: FileSourceScanExec => scan.dataFilters
 +            case scan: CometScanExec => scan.dataFilters
-+            case scan: CometNativeScanExec => scan.dataFilters
 +          }
 +          assert(dataFilters.length == 2)
 +          assert(dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
          }
        }
      }
-@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2463,7 +2505,7 @@ index 0ab8691801d..b18a5bea944 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2488,7 +2530,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..7a6a88a9fce 100644
+index 746f289c393..5b9e31c1fa6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2513,7 +2555,7 @@ index 746f289c393..7a6a88a9fce 100644
  import org.apache.spark.sql.execution.joins.SortMergeJoinExec
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.internal.SQLConf
-@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
      }
    }
  
@@ -2523,7 +2565,6 @@ index 746f289c393..7a6a88a9fce 100644
 +    val fileScan = collect(plan) {
 +      case f: FileSourceScanExec => f
 +      case f: CometScanExec => f
-+      case f: CometNativeScanExec => f
 +    }
      assert(fileScan.nonEmpty, plan)
      fileScan.head
@@ -2532,13 +2573,12 @@ index 746f289c393..7a6a88a9fce 100644
 +  private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) 
match {
 +    case fs: FileSourceScanExec => fs.bucketedScan
 +    case bs: CometScanExec => bs.bucketedScan
-+    case ns: CometNativeScanExec => ns.bucketedScan
 +  }
 +
    // To verify if the bucket pruning works, this function checks two 
conditions:
    //   1) Check if the pruned buckets (before filtering) are empty.
    //   2) Verify the final result is the same as the expected one
-@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
            val planWithoutBucketedScan = 
bucketedDataFrame.filter(filterCondition)
              .queryExecution.executedPlan
            val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2548,7 +2588,7 @@ index 746f289c393..7a6a88a9fce 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2611,7 +2651,14 @@ index 746f289c393..7a6a88a9fce 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+     }
+   }
+ 
+-  test("disable bucketing when the output doesn't contain all bucketing 
columns") {
++  test("disable bucketing when the output doesn't contain all bucketing 
columns",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("bucketed_table") {
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2625,7 +2672,7 @@ index 746f289c393..7a6a88a9fce 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2636,7 +2683,7 @@ index 746f289c393..7a6a88a9fce 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2647,7 +2694,7 @@ index 746f289c393..7a6a88a9fce 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2658,7 +2705,17 @@ index 746f289c393..7a6a88a9fce 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+     }
+   }
+ 
+-  test("bucket coalescing is applied when join expressions match with 
partitioning expressions") {
++  test("bucket coalescing is applied when join expressions match with 
partitioning expressions",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
+@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
@@ -2669,7 +2726,6 @@ index 746f289c393..7a6a88a9fce 100644
                val scans = collect(plan) {
                  case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
 +                case b: CometScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
-+                case b: CometNativeScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
                }
                if (expectedCoalescedNumBuckets.isDefined) {
                  assert(scans.length == 1)
@@ -2679,8 +2735,6 @@ index 746f289c393..7a6a88a9fce 100644
 +                    assert(f.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                  case b: CometScanExec =>
 +                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
-+                  case b: CometNativeScanExec =>
-+                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                }
                } else {
                  assert(scans.isEmpty)
@@ -2710,18 +2764,20 @@ index 6f897a9c0b7..b0723634f68 100644
  
    protected override lazy val sql = spark.sql _
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index d675503a8ba..f220892396e 100644
+index d675503a8ba..c386a8cb686 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,8 @@
+ 
  package org.apache.spark.sql.sources
  
- import org.apache.spark.sql.QueryTest
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
+-import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest}
++import org.apache.spark.sql.comet.CometScanExec
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
  import org.apache.spark.sql.internal.SQLConf
-@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
+@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
  
      def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): 
Unit = {
        val plan = sql(query).queryExecution.executedPlan
@@ -2729,11 +2785,60 @@ index d675503a8ba..f220892396e 100644
 +      val bucketedScan = collect(plan) {
 +        case s: FileSourceScanExec if s.bucketedScan => s
 +        case s: CometScanExec if s.bucketedScan => s
-+        case s: CometNativeScanExec if s.bucketedScan => s
 +      }
        assert(bucketedScan.length == expectedNumBucketedScan)
      }
  
+@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - basic test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2")
+@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows") {
++  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1") {
+       withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+         sql(
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 index 7f6fa2a123e..c778b4e2c48 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -2868,6 +2973,39 @@ index aad91601758..201083bd621 100644
        })
    }
  
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+index b5cf13a9c12..ac17603fb7f 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
+ 
+ import org.apache.spark.{SparkException, TestUtils}
+ import org.apache.spark.internal.Logging
+-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
IgnoreCometNativeDataFusion, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
+ import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
CTERelationRef, LocalRelation}
+@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
+     )
+   }
+ 
+-  test("SPARK-41198: input row calculation with CTE") {
++  test("SPARK-41198: input row calculation with CTE",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     withTable("parquet_tbl", "parquet_streaming_tbl") {
+       spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
+         .write.format("parquet").saveAsTable("parquet_tbl")
+@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
+     }
+   }
+ 
+-  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources") {
++  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     withTable("parquet_streaming_tbl") {
+       val streamInput = MemoryStream[Int]
+       val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS 
value_stream")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 index 8f099c31e6b..ce4b7ad25b3 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
@@ -3111,6 +3249,29 @@ index de3b1ffccf0..2a76d127093 100644
  
    override def beforeEach(): Unit = {
      super.beforeEach()
+diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+index f3be79f9022..b4b1ea8dbc4 100644
+--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
++++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+@@ -34,7 +34,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
+ import org.apache.hadoop.io.{LongWritable, Writable}
+ 
+ import org.apache.spark.{SparkException, SparkFiles, TestUtils}
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest, Row}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+ import org.apache.spark.sql.catalyst.plans.logical.Project
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
+     }
+   }
+ 
+-  test("SPARK-11522 select input_file_name from non-parquet table") {
++  test("SPARK-11522 select input_file_name from non-parquet table",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+ 
+     withTempDir { tempDir =>
+ 
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 index 6160c3e5f6c..0956d7d9edc 100644
 --- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/docs/source/contributor-guide/parquet_scans.md 
b/docs/source/contributor-guide/parquet_scans.md
index 2a10bb111..833eda75e 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -63,9 +63,10 @@ cause Comet to fall back to Spark.
   The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these 
functions cannot populate their values.
 - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to 
`true`
 - No support for duplicate field names in case-insensitive mode. When the 
required or data schema contains
-  field names that differ only by case (e.g., `B` and `b`), Comet falls back 
to Spark. Note that duplicates
-  in the physical Parquet file that are not reflected in the table schema 
cannot be detected at plan time,
-  so DataFusion may produce a different error message than Spark in that case.
+  field names that differ only by case (e.g., `B` and `b`), Comet falls back 
to Spark. Duplicates
+  in the physical Parquet file that are not reflected in the table schema 
cannot be detected at plan time;
+  in that case DataFusion will throw a `SparkRuntimeException` with error 
class `_LEGACY_ERROR_TEMP_2093`,
+  matching Spark's behavior.
 
 The `native_iceberg_compat` scan has the following additional limitation that 
may produce incorrect results
 without falling back to Spark:
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 05b673346..230fc2a53 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1826,7 +1826,7 @@ dependencies = [
 
 [[package]]
 name = "datafusion-comet"
-version = "0.14.0"
+version = "0.15.0"
 dependencies = [
  "arrow",
  "assertables",
@@ -1901,7 +1901,7 @@ dependencies = [
 
 [[package]]
 name = "datafusion-comet-objectstore-hdfs"
-version = "0.14.0"
+version = "0.15.0"
 dependencies = [
  "async-trait",
  "bytes",
@@ -1915,7 +1915,7 @@ dependencies = [
 
 [[package]]
 name = "datafusion-comet-proto"
-version = "0.14.0"
+version = "0.15.0"
 dependencies = [
  "prost",
  "prost-build",
@@ -1923,7 +1923,7 @@ dependencies = [
 
 [[package]]
 name = "datafusion-comet-spark-expr"
-version = "0.14.0"
+version = "0.15.0"
 dependencies = [
  "arrow",
  "base64",
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index d4582da63..2886df29d 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -436,13 +436,15 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, 
backtrace: Option<Strin
             // Handle direct SparkError - serialize to JSON
             CometError::Spark(spark_error) => throw_spark_error_as_json(env, 
spark_error),
             _ => {
-                // Check for file-not-found errors that may arrive through 
other wrapping paths
                 let error_msg = error.to_string();
+                // Check for file-not-found errors that may arrive through 
other wrapping paths
                 if error_msg.contains("not found")
                     && error_msg.contains("No such file or directory")
                 {
                     let spark_error = SparkError::FileNotFound { message: 
error_msg };
                     throw_spark_error_as_json(env, &spark_error)
+                } else if let Some(spark_error) = 
try_convert_duplicate_field_error(&error_msg) {
+                    throw_spark_error_as_json(env, &spark_error)
                 } else {
                     let exception = error.to_exception();
                     match backtrace {
@@ -474,6 +476,54 @@ fn throw_spark_error_as_json(
     )
 }
 
+/// Try to convert a DataFusion "Unable to get field named" error into a 
SparkError.
+/// DataFusion produces this error when reading Parquet files with duplicate 
field names
+/// in case-insensitive mode. For example, if a Parquet file has columns "B" 
and "b",
+/// DataFusion may deduplicate them and report: Unable to get field named "b". 
Valid
+/// fields: ["A", "B"]. When the requested field has a case-insensitive match 
among the
+/// valid fields, we convert this to Spark's _LEGACY_ERROR_TEMP_2093 error.
+fn try_convert_duplicate_field_error(error_msg: &str) -> Option<SparkError> {
+    // Match: Schema error: Unable to get field named "X". Valid fields: [...]
+    lazy_static! {
+        static ref FIELD_RE: Regex =
+            Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields: 
\[(.+)\]"#).unwrap();
+    }
+    if let Some(caps) = FIELD_RE.captures(error_msg) {
+        let requested_field = caps.get(1)?.as_str();
+        let requested_lower = requested_field.to_lowercase();
+        // Parse field names from the Valid fields list: ["A", "B"] or [A, B, 
b]
+        let valid_fields_raw = caps.get(2)?.as_str();
+        let all_fields: Vec<String> = valid_fields_raw
+            .split(',')
+            .map(|s| s.trim().trim_matches('"').to_string())
+            .collect();
+        // Find fields that match case-insensitively
+        let mut matched: Vec<String> = all_fields
+            .into_iter()
+            .filter(|f| f.to_lowercase() == requested_lower)
+            .collect();
+        // Need at least one case-insensitive match to treat this as a 
duplicate field error.
+        // DataFusion may deduplicate columns case-insensitively, so the valid 
fields list
+        // might contain only one variant (e.g. "B" when file has both "B" and 
"b").
+        // If requested field differs from the match, both existed in the 
original file.
+        if matched.is_empty() {
+            return None;
+        }
+        // Add the requested field name if it's not already in the list 
(different case)
+        if !matched.iter().any(|f| f == requested_field) {
+            matched.push(requested_field.to_string());
+        }
+        let required_field_name = requested_field.to_string();
+        let matched_fields = format!("[{}]", matched.join(", "));
+        Some(SparkError::DuplicateFieldCaseInsensitive {
+            required_field_name,
+            matched_fields,
+        })
+    } else {
+        None
+    }
+}
+
 #[derive(Debug, Error)]
 enum StacktraceError {
     #[error("Unable to initialize message: {0}")]
diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs
index 592ed8b44..9633dc98d 100644
--- a/native/spark-expr/src/error.rs
+++ b/native/spark-expr/src/error.rs
@@ -169,6 +169,12 @@ pub enum SparkError {
     #[error("{message}")]
     FileNotFound { message: String },
 
+    #[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) 
\"{required_field_name}\": [{matched_fields}] in case-insensitive mode")]
+    DuplicateFieldCaseInsensitive {
+        required_field_name: String,
+        matched_fields: String,
+    },
+
     #[error("ArrowError: {0}.")]
     Arrow(Arc<ArrowError>),
 
@@ -240,6 +246,7 @@ impl SparkError {
             SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
             SparkError::ScalarSubqueryTooManyRows => 
"ScalarSubqueryTooManyRows",
             SparkError::FileNotFound { .. } => "FileNotFound",
+            SparkError::DuplicateFieldCaseInsensitive { .. } => 
"DuplicateFieldCaseInsensitive",
             SparkError::Arrow(_) => "Arrow",
             SparkError::Internal(_) => "Internal",
         }
@@ -430,6 +437,15 @@ impl SparkError {
                     "message": message,
                 })
             }
+            SparkError::DuplicateFieldCaseInsensitive {
+                required_field_name,
+                matched_fields,
+            } => {
+                serde_json::json!({
+                    "requiredFieldName": required_field_name,
+                    "matchedOrcFields": matched_fields,
+                })
+            }
             SparkError::Arrow(e) => {
                 serde_json::json!({
                     "message": e.to_string(),
@@ -499,6 +515,11 @@ impl SparkError {
             // FileNotFound - will be converted to SparkFileNotFoundException 
by the shim
             SparkError::FileNotFound { .. } => 
"org/apache/spark/SparkException",
 
+            // DuplicateFieldCaseInsensitive - converted to 
SparkRuntimeException by the shim
+            SparkError::DuplicateFieldCaseInsensitive { .. } => {
+                "org/apache/spark/SparkRuntimeException"
+            }
+
             // Generic errors
             SparkError::Arrow(_) | SparkError::Internal(_) => 
"org/apache/spark/SparkException",
         }
@@ -574,6 +595,9 @@ impl SparkError {
             // File not found
             SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
 
+            // Duplicate field in case-insensitive mode
+            SparkError::DuplicateFieldCaseInsensitive { .. } => 
Some("_LEGACY_ERROR_TEMP_2093"),
+
             // Generic errors (no error class)
             SparkError::Arrow(_) | SparkError::Internal(_) => None,
         }
diff --git 
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index da65b1eb4..e4ec9e006 100644
--- 
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -251,6 +251,12 @@ trait ShimSparkErrorConverter {
           QueryExecutionErrors
             .intervalArithmeticOverflowError("Interval arithmetic overflow", 
"", sqlCtx(context)))
 
+      case "DuplicateFieldCaseInsensitive" =>
+        Some(
+          QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+            params("requiredFieldName").toString,
+            params("matchedOrcFields").toString))
+
       case "FileNotFound" =>
         val msg = params("message").toString
         // Extract file path from native error message and format like Hadoop's
diff --git 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index ae21d1276..41f461100 100644
--- 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -247,6 +247,12 @@ trait ShimSparkErrorConverter {
           QueryExecutionErrors
             .intervalArithmeticOverflowError("Interval arithmetic overflow", 
"", sqlCtx(context)))
 
+      case "DuplicateFieldCaseInsensitive" =>
+        Some(
+          QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+            params("requiredFieldName").toString,
+            params("matchedOrcFields").toString))
+
       case "FileNotFound" =>
         val msg = params("message").toString
         // Extract file path from native error message and format like Hadoop's
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 01d4eac4b..f906db140 100644
--- 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -258,6 +258,12 @@ trait ShimSparkErrorConverter {
           
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
             context.headOption.orNull))
 
+      case "DuplicateFieldCaseInsensitive" =>
+        Some(
+          QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+            params("requiredFieldName").toString,
+            params("matchedOrcFields").toString))
+
       case "FileNotFound" =>
         val msg = params("message").toString
         // Extract file path from native error message and format like Hadoop's


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to