This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e23d94628 chore: run Spark 3.4 tests with `native_datafusion` scan 
(#3722)
e23d94628 is described below

commit e23d94628254839f2194eaecf3c5bc9f9b664dde
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 23 06:29:33 2026 -0700

    chore: run Spark 3.4 tests with `native_datafusion` scan (#3722)
---
 .github/workflows/spark_sql_test.yml |   1 +
 dev/diffs/3.4.3.diff                 | 273 +++++++++++++++------
 dev/diffs/4.0.1.diff                 | 456 ++++++++++++++++++++++++++++-------
 3 files changed, 579 insertions(+), 151 deletions(-)

diff --git a/.github/workflows/spark_sql_test.yml 
b/.github/workflows/spark_sql_test.yml
index 4d777cda8..3a763d321 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -126,6 +126,7 @@ jobs:
         # - native_iceberg_compat: Spark 3.5 only
         config:
           - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 
'auto'}
+          - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 
'native_datafusion'}
           - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'auto'}
           - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'native_datafusion'}
           - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 
'auto'}
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 8738b3813..e40328536 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index d3544881af1..9c16099090c 100644
+index d3544881af1..377683b10c5 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -148,6 +148,8 @@
@@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..1925aac8d97 100644
+index f33432ddb6f..7d758d2481f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -478,7 +478,17 @@ index f33432ddb6f..1925aac8d97 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
-@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+    * Check the static scan metrics with and without DPP
+    */
+   test("static scan metrics",
+-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++    DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442";))
 {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -513,7 +523,7 @@ index a6b295578d6..91acca4306f 100644
  
    test("SPARK-35884: Explain Formatted") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 2796b1cf154..52438178a0e 100644
+index 2796b1cf154..d628f44e4ee 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 @@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, 
NullData, NullUDT}
@@ -524,7 +534,17 @@ index 2796b1cf154..52438178a0e 100644
  import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest
+   }
+ 
+   Seq("parquet", "orc").foreach { format =>
+-    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}") {
++    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760";))
 {
+       withTempDir { dir =>
+         val tableName = s"spark_25132_${format}_native"
+         val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest
              assert(bJoinExec.isEmpty)
              val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
                case smJoin: SortMergeJoinExec => smJoin
@@ -532,7 +552,7 @@ index 2796b1cf154..52438178a0e 100644
              }
              assert(smJoinExec.nonEmpty)
            }
-@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
@@ -540,7 +560,7 @@ index 2796b1cf154..52438178a0e 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
@@ -548,7 +568,7 @@ index 2796b1cf154..52438178a0e 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest
            val filters = df.queryExecution.executedPlan.collect {
              case f: FileSourceScanLike => f.dataFilters
              case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -981,7 +1001,7 @@ index 48ad10992c5..51d1ee65422 100644
          extensions.injectColumnar(session =>
            MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) 
}
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index 18123a4d6ec..fbe4c766eee 100644
+index 18123a4d6ec..0fe185baa33 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 @@ -17,6 +17,8 @@
@@ -1983,10 +2003,18 @@ index 07e2849ce6f..3e73645b638 100644
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 104b4e416cd..37ea65081e4 100644
+index 104b4e416cd..d865077684f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
+ 
+ import org.apache.spark.{SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, 
IgnoreCometNativeScan}
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
+@@ -1096,7 +1097,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            // When a filter is pushed to Parquet, Parquet can apply it to 
every row.
            // So, we can check the number of rows returned from the Parquet
            // to make sure our filter pushdown work.
@@ -1999,7 +2027,7 @@ index 104b4e416cd..37ea65081e4 100644
          }
        }
      }
-@@ -1499,7 +1503,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1499,7 +1504,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2009,7 +2037,7 @@ index 104b4e416cd..37ea65081e4 100644
      import testImplicits._
  
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1581,7 +1586,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1581,7 +1587,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            // than the total length but should not be a single record.
            // Note that, if record level filtering is enabled, it should be a 
single record.
            // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2022,7 +2050,7 @@ index 104b4e416cd..37ea65081e4 100644
          }
        }
      }
-@@ -1608,7 +1617,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1608,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
          // than the total length but should not be a single record.
          // Note that, if record level filtering is enabled, it should be a 
single record.
          // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2035,7 +2063,17 @@ index 104b4e416cd..37ea65081e4 100644
        }
      }
    }
-@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1700,7 +1714,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+       (attr, value) => sources.StringContains(attr, value))
+   }
+ 
+-  test("filter pushdown - StringPredicate") {
++  test("filter pushdown - StringPredicate",
++    IgnoreCometNativeDataFusion("cannot be pushed down")) {
+     import testImplicits._
+     // keep() should take effect on StartsWith/EndsWith/Contains
+     Seq(
+@@ -1744,7 +1759,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2045,7 +2083,17 @@ index 104b4e416cd..37ea65081e4 100644
      val schema = StructType(Seq(
        StructField("a", IntegerType, nullable = false)
      ))
-@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode") {
++  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760";))
 {
+     withTempPath { dir =>
+       val count = 10
+       val tableName = "spark_25207"
+@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2055,7 +2103,7 @@ index 104b4e416cd..37ea65081e4 100644
      // block 1:
      //                      null count  min                                   
    max
      // page-0                         0  0                                    
     99
-@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2065,7 +2113,7 @@ index 104b4e416cd..37ea65081e4 100644
      withTempPath { dir =>
        val path = dir.getCanonicalPath
        spark.range(100).selectExpr("id * 2 AS id")
-@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2078,7 +2126,7 @@ index 104b4e416cd..37ea65081e4 100644
          } else {
            assert(selectedFilters.isEmpty, "There is filter pushed down")
          }
-@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2092,10 +2140,38 @@ index 104b4e416cd..37ea65081e4 100644
          case _ =>
            throw new AnalysisException("Can not match ParquetTable in the 
query.")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8670d95c65e..b624c3811dd 100644
+index 8670d95c65e..c7ba51f770f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, 
MessageTypeParser}
+ 
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
+ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error") {
++  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     val data = (1 to 4).map(i => Tuple1(i.toString))
+     val readSchema = StructType(Seq(StructField("_1", 
DataTypes.TimestampType)))
+ 
+@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-35640: int as long should throw schema incompatible error") {
++  test("SPARK-35640: int as long should throw schema incompatible error",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     val data = (1 to 4).map(i => Tuple1(i))
+     val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
+ 
+@@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
      }
    }
  
@@ -2106,10 +2182,28 @@ index 8670d95c65e..b624c3811dd 100644
        checkAnswer(
          // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index 29cb224c878..44837aa953b 100644
+index 29cb224c878..ee5a87fa200 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -978,7 +978,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+ 
+ import org.apache.spark.{DebugFilesystem, SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+ import org.apache.spark.sql.catalyst.util.ArrayData
+@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
++  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     val data = (1 to 1000).map { i =>
+       val ts = new java.sql.Timestamp(i)
+       Row(ts)
+@@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
      }
    }
  
@@ -2119,7 +2213,17 @@ index 29cb224c878..44837aa953b 100644
      withAllParquetReaders {
        withTempPath { path =>
          // Repeated values for dictionary encoding.
-@@ -1047,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+   }
+ 
+-  test("SPARK-34212 Parquet should read decimals correctly") {
++  test("SPARK-34212 Parquet should read decimals correctly",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     def readParquet(schema: String, path: File): DataFrame = {
+       spark.read.schema(schema).parquet(path.toString)
+     }
+@@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
          checkAnswer(readParquet(schema, path), df)
        }
  
@@ -2129,7 +2233,7 @@ index 29cb224c878..44837aa953b 100644
          val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
          checkAnswer(readParquet(schema1, path), df)
          val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1069,7 +1071,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1069,7 +1074,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
        val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
        df.write.parquet(path.toString)
  
@@ -2139,7 +2243,17 @@ index 29cb224c878..44837aa953b 100644
          checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
          checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
          checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 
123456.0"))
-@@ -1128,7 +1131,7 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("row group skipping doesn't overflow when reading into larger type") {
++  test("row group skipping doesn't overflow when reading into larger type",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     withTempPath { path =>
+       Seq(0).toDF("a").write.parquet(path.toString)
+       // The vectorized and non-vectorized readers will produce different 
exceptions, we don't need
+@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
              .where(s"a < ${Long.MaxValue}")
              .collect()
          }
@@ -2244,14 +2358,14 @@ index 5c0b7def039..151184bc98c 100644
      assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
        s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index bf5c51b89bb..ca22370ca3b 100644
+index bf5c51b89bb..4e2f0bdb389 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 @@ -27,6 +27,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
  import org.apache.parquet.schema.Type._
  
  import org.apache.spark.SparkException
-+import org.apache.spark.sql.IgnoreComet
++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
  import org.apache.spark.sql.catalyst.ScalaReflection
  import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
  import org.apache.spark.sql.functions.desc
@@ -2265,6 +2379,26 @@ index bf5c51b89bb..ca22370ca3b 100644
      withTempPath { dir =>
        val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = false)
        val expectedMessage = "Encountered error while reading file"
+@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("schema mismatch failure error message for parquet vectorized reader") 
{
++  test("schema mismatch failure error message for parquet vectorized reader",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     withTempPath { dir =>
+       val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = true)
+       assert(e.getCause.isInstanceOf[SparkException])
+@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>") {
++  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     import testImplicits._
+ 
+     withTempPath { dir =>
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 index 3a0bd35cb70..b28f06a757f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -2314,7 +2448,7 @@ index 26e61c6b58d..cb09d7e116a 100644
        spark.range(10).selectExpr("id", "id % 3 as p")
          .write.partitionBy("p").saveAsTable("testDataForScan")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..b18a5bea944 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 @@ -18,6 +18,7 @@
@@ -2325,19 +2459,21 @@ index 0ab8691801d..d9125f658ad 100644
  import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
++            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
++            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            // $"a" is not null and $"a" > 1
@@ -2346,13 +2482,14 @@ index 0ab8691801d..d9125f658ad 100644
 +          val dataFilters = scanNodes.head match {
 +            case scan: FileSourceScanExec => scan.dataFilters
 +            case scan: CometScanExec => scan.dataFilters
++            case scan: CometNativeScanExec => scan.dataFilters
 +          }
 +          assert(dataFilters.length == 2)
 +          assert(dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
          }
        }
      }
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2360,7 +2497,7 @@ index 0ab8691801d..d9125f658ad 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2804,7 +2941,7 @@ index abe606ad9c1..2d930b64cca 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index dd55fcfe42c..a1d390c93d0 100644
+index dd55fcfe42c..99bc018008a 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 @@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2823,46 +2960,42 @@ index dd55fcfe42c..a1d390c93d0 100644
  import org.apache.spark.sql.execution.FilterExec
  import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
  import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -118,7 +120,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with 
SQLTestUtilsBase with
-   }
+@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
  
    override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
--      (implicit pos: Position): Unit = {
-+                             (implicit pos: Position): Unit = {
+       (implicit pos: Position): Unit = {
++    // Check Comet skip tags first, before DisableAdaptiveExecution handling
++    if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++      ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++      return
++    }
++    if (isCometEnabled) {
++      val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++      val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      if (isNativeIcebergCompat &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++        ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: 
_*)(testFun)
++        return
++      }
++      if (isNativeDataFusion &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
++        return
++      }
++      if ((isNativeDataFusion || isNativeIcebergCompat) &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
++          testTags: _*)(testFun)
++        return
++      }
++    }
      if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
        super.test(testName, testTags: _*) {
          withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
-         }
-       }
-     } else {
--      super.test(testName, testTags: _*)(testFun)
-+      if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+        ignore(testName + " (disabled when Comet is on)", testTags: 
_*)(testFun)
-+      } else {
-+        val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+        val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        if (isCometEnabled && isNativeIcebergCompat &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+          ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", 
testTags: _*)(testFun)
-+        } else if (isCometEnabled && isNativeDataFusion &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
-+        } else if (isCometEnabled && (isNativeDataFusion || 
isNativeIcebergCompat) &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
-+            testTags: _*)(testFun)
-+        } else {
-+          super.test(testName, testTags: _*)(testFun)
-+        }
-+      }
-     }
-   }
- 
-@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
+@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
      protected override def _sqlContext: SQLContext = self.spark.sqlContext
    }
  
@@ -2892,7 +3025,7 @@ index dd55fcfe42c..a1d390c93d0 100644
    protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): 
Unit = {
      SparkSession.setActiveSession(spark)
      super.withSQLConf(pairs: _*)(f)
-@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase
      val schema = df.schema
      val withoutFilters = df.queryExecution.executedPlan.transform {
        case FilterExec(_, child) => child
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index a41ff3bbd..407807cac 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644
        withSpark(sc) { sc =>
          TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 diff --git a/pom.xml b/pom.xml
-index 22922143fc3..7c56e5e8641 100644
+index 22922143fc3..97332f7e6ac 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -148,6 +148,8 @@
@@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index 2c24cc7d570..21d36ebc6f5 100644
+index 2c24cc7d570..3311e6e3773 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -615,7 +615,17 @@ index 2c24cc7d570..21d36ebc6f5 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
        withTable("large", "dimTwo", "dimThree") {
-@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1151,7 +1157,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("join key with multiple references on the filtering plan") {
++  test("join key with multiple references on the filtering plan",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true",
+       SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> 
AQEPropagateEmptyRelation.ruleName,
+       SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support 
"String + String"
+@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase
    }
  
    test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
@@ -625,7 +635,15 @@ index 2c24cc7d570..21d36ebc6f5 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
          val df = sql(
-@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1330,6 +1338,7 @@ abstract class DynamicPartitionPruningSuiteBase
+   }
+ 
+   test("Subquery reuse across the whole plan",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";),
+     DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -1424,7 +1433,8 @@ abstract class DynamicPartitionPruningSuiteBase
      }
    }
  
@@ -635,7 +653,7 @@ index 2c24cc7d570..21d36ebc6f5 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
-@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1455,7 +1465,8 @@ abstract class DynamicPartitionPruningSuiteBase
      }
    }
  
@@ -645,7 +663,17 @@ index 2c24cc7d570..21d36ebc6f5 100644
      val df = sql(
        """
          |SELECT s.store_id, f.product_id
-@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1699,7 +1710,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+    * Check the static scan metrics with and without DPP
+    */
+   test("static scan metrics",
+-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++    DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1730,6 +1742,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -680,18 +708,51 @@ index 9c90e0105a4..fadf2f0f698 100644
  
    test("SPARK-35884: Explain Formatted") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 9c529d14221..2f1bc3880fd 100644
+index 9c529d14221..6cfd87ad864 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -33,6 +33,8 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
  import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
  import org.apache.spark.sql.catalyst.plans.logical.Filter
  import org.apache.spark.sql.catalyst.types.DataTypeUtils
-+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, 
CometSortMergeJoinExec}
++import org.apache.spark.sql.catalyst.util.quietly
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, 
CometScanExec, CometSortMergeJoinExec}
  import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -967,6 +968,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest
+   }
+ 
+   allFileBasedDataSources.foreach { format =>
+-    testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
++    val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
++      Seq(IgnoreCometNativeDataFusion(
++        "https://github.com/apache/datafusion-comet/issues/3728";))
++    } else Seq.empty
++    test(s"Enabling/disabling ignoreMissingFiles using $format", 
ignoreMissingTags: _*) { quietly {
+       def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
+         withTempDir { dir =>
+           val basePath = dir.getCanonicalPath
+@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest
+           }
+         }
+       }
+-    }
++    }}
+   }
+ 
+   Seq("json", "orc").foreach { format =>
+@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest
+   }
+ 
+   Seq("parquet", "orc").foreach { format =>
+-    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}") {
++    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+       withTempDir { dir =>
+         val tableName = s"spark_25132_${format}_native"
+         val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest
              assert(bJoinExec.isEmpty)
              val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
                case smJoin: SortMergeJoinExec => smJoin
@@ -699,7 +760,7 @@ index 9c529d14221..2f1bc3880fd 100644
              }
              assert(smJoinExec.nonEmpty)
            }
-@@ -1027,6 +1029,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -707,7 +768,7 @@ index 9c529d14221..2f1bc3880fd 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1068,6 +1071,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -715,11 +776,22 @@ index 9c529d14221..2f1bc3880fd 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1252,6 +1256,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest
+     }
+   }
+ 
+-  test("SPARK-41017: filter pushdown with nondeterministic predicates") {
++  test("SPARK-41017: filter pushdown with nondeterministic predicates",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     withTempPath { path =>
+       val pathStr = path.getCanonicalPath
+       spark.range(10).write.parquet(pathStr)
+@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest
            val filters = df.queryExecution.executedPlan.collect {
              case f: FileSourceScanLike => f.dataFilters
              case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
 +            case b: CometScanExec => b.dataFilters
++            case b: CometNativeScanExec => b.dataFilters
 +            case b: CometBatchScanExec => 
b.scan.asInstanceOf[FileScan].dataFilters
            }.flatten
            assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, 
Literal(5L))))
@@ -1253,10 +1325,10 @@ index 0df7f806272..92390bd819f 100644
  
    test("non-matching optional group") {
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-index 2e33f6505ab..47fa031add5 100644
+index 2e33f6505ab..d0f84e8c44d 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
+@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException
  import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
  import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
  import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, 
LogicalPlan, Project, Sort, Union}
@@ -1268,8 +1340,11 @@ index 2e33f6505ab..47fa031add5 100644
 +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
  import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
  import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
  import org.apache.spark.sql.test.SharedSparkSession
-@@ -1529,6 +1530,12 @@ class SubquerySuite extends QueryTest
+ 
+ class SubquerySuite extends QueryTest
+@@ -1529,6 +1531,12 @@ class SubquerySuite extends QueryTest
              fs.inputRDDs().forall(
                _.asInstanceOf[FileScanRDD].filePartitions.forall(
                  _.files.forall(_.urlEncodedPath.contains("p=0"))))
@@ -1282,7 +1357,17 @@ index 2e33f6505ab..47fa031add5 100644
          case _ => false
        })
      }
-@@ -2094,7 +2101,7 @@ class SubquerySuite extends QueryTest
+@@ -2035,7 +2043,8 @@ class SubquerySuite extends QueryTest
+     }
+   }
+ 
+-  test("Subquery reuse across the whole plan") {
++  test("Subquery reuse across the whole plan",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+       SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY.key -> "false") {
+       val df = sql(
+@@ -2094,7 +2103,7 @@ class SubquerySuite extends QueryTest
  
        df.collect()
        val exchanges = collect(df.queryExecution.executedPlan) {
@@ -1291,7 +1376,13 @@ index 2e33f6505ab..47fa031add5 100644
        }
        assert(exchanges.size === 1)
      }
-@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest
+@@ -2674,22 +2683,31 @@ class SubquerySuite extends QueryTest
+     }
+   }
+ 
+-  test("SPARK-43402: FileSourceScanExec supports push down data filter with 
scalar subquery") {
++  test("SPARK-43402: FileSourceScanExec supports push down data filter with 
scalar subquery",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
      def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
        val df = sql(query)
        checkAnswer(df, answer)
@@ -1821,6 +1912,20 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index 77a988f340e..1acc534064e 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
+     }
+   }
+ 
+-  test("alter temporary view should follow current storeAnalyzedPlanForView 
config") {
++  test("alter temporary view should follow current storeAnalyzedPlanForView 
config",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     withTable("t") {
+       Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+       withView("v1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index aed11badb71..1a365b5aacf 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -2512,22 +2617,23 @@ index 272be70f9fe..06957694002 100644
          assert(collect(initialExecutedPlan) {
            case i: InMemoryTableScanLike => i
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
-index 0a0b23d1e60..5685926250f 100644
+index 0a0b23d1e60..dcc9c141315 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat
  import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
  import org.apache.spark.sql.catalyst.plans.logical.Expand
  import org.apache.spark.sql.catalyst.types.DataTypeUtils
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.functions._
-@@ -868,6 +869,7 @@ abstract class SchemaPruningSuite
+@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite
      val fileSourceScanSchemata =
        collect(df.queryExecution.executedPlan) {
          case scan: FileSourceScanExec => scan.requiredSchema
 +        case scan: CometScanExec => scan.requiredSchema
++        case scan: CometNativeScanExec => scan.requiredSchema
        }
      assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
        s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
@@ -2621,10 +2727,18 @@ index cd6f41b4ef4..4b6a17344bc 100644
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 6080a5e8e4b..9aa8f49a62b 100644
+index 6080a5e8e4b..dc64436164f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-@@ -1102,7 +1102,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
+ 
+ import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, 
IgnoreCometNativeScan}
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
+@@ -1102,7 +1103,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            // When a filter is pushed to Parquet, Parquet can apply it to 
every row.
            // So, we can check the number of rows returned from the Parquet
            // to make sure our filter pushdown work.
@@ -2637,7 +2751,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
          }
        }
      }
-@@ -1505,7 +1509,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1505,7 +1510,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2647,7 +2761,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
      import testImplicits._
  
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1587,7 +1592,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1587,7 +1593,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            // than the total length but should not be a single record.
            // Note that, if record level filtering is enabled, it should be a 
single record.
            // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2660,7 +2774,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
          }
        }
      }
-@@ -1614,7 +1623,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1614,7 +1624,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
          // than the total length but should not be a single record.
          // Note that, if record level filtering is enabled, it should be a 
single record.
          // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2673,7 +2787,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
        }
      }
    }
-@@ -1706,7 +1719,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1706,7 +1720,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
        (attr, value) => sources.StringContains(attr, value))
    }
  
@@ -2682,7 +2796,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
      import testImplicits._
      // keep() should take effect on StartsWith/EndsWith/Contains
      Seq(
-@@ -1750,7 +1763,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1750,7 +1764,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2692,7 +2806,17 @@ index 6080a5e8e4b..9aa8f49a62b 100644
      val schema = StructType(Seq(
        StructField("a", IntegerType, nullable = false)
      ))
-@@ -1993,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode") {
++  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withTempPath { dir =>
+       val count = 10
+       val tableName = "spark_25207"
+@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2702,7 +2826,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
      // block 1:
      //                      null count  min                                   
    max
      // page-0                         0  0                                    
     99
-@@ -2053,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2712,7 +2836,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
      withTempPath { dir =>
        val path = dir.getCanonicalPath
        spark.range(100).selectExpr("id * 2 AS id")
-@@ -2305,7 +2321,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2725,7 +2849,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
          } else {
            assert(selectedFilters.isEmpty, "There is filter pushed down")
          }
-@@ -2368,7 +2388,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2739,10 +2863,28 @@ index 6080a5e8e4b..9aa8f49a62b 100644
          case _ => assert(false, "Can not match ParquetTable in the query.")
        }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 4474ec1fd42..97910c4fc3a 100644
+index 4474ec1fd42..05fa0257c82 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, 
MessageTypeParser}
+ 
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error") {
++  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     val data = (1 to 4).map(i => Tuple1(i.toString))
+     val readSchema = StructType(Seq(StructField("_1", 
DataTypes.TimestampType)))
+ 
+@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
      }
    }
  
@@ -2753,10 +2895,38 @@ index 4474ec1fd42..97910c4fc3a 100644
        checkAnswer(
          // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index bba71f1c48d..38c60ee2584 100644
+index bba71f1c48d..0b52574e0f3 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+ 
+ import org.apache.spark.{DebugFilesystem, SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+ import org.apache.spark.sql.catalyst.util.ArrayData
+@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
++  test("SPARK-47447: read TimestampLTZ as TimestampNTZ",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, 
false)))
+ 
+     Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
+@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("Enabling/disabling ignoreCorruptFiles") {
++  test("Enabling/disabling ignoreCorruptFiles",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
+       withTempDir { dir =>
+         val basePath = dir.getCanonicalPath
+@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
          Seq(Some("A"), Some("A"), None).toDF().repartition(1)
            .write.parquet(path.getAbsolutePath)
          val df = spark.read.parquet(path.getAbsolutePath)
@@ -2769,7 +2939,17 @@ index bba71f1c48d..38c60ee2584 100644
        }
      }
    }
-@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+   }
+ 
+-  test("SPARK-34212 Parquet should read decimals correctly") {
++  test("SPARK-34212 Parquet should read decimals correctly",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     def readParquet(schema: String, path: File): DataFrame = {
+       spark.read.schema(schema).parquet(path.toString)
+     }
+@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
          checkAnswer(readParquet(schema2, path), df)
        }
  
@@ -2779,7 +2959,7 @@ index bba71f1c48d..38c60ee2584 100644
          val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
          checkAnswer(readParquet(schema1, path), df)
          val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
        val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
        df.write.parquet(path.toString)
  
@@ -2789,6 +2969,16 @@ index bba71f1c48d..38c60ee2584 100644
          checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
          checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
          checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("row group skipping doesn't overflow when reading into larger type") {
++  test("row group skipping doesn't overflow when reading into larger type",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     withTempPath { path =>
+       Seq(0).toDF("a").write.parquet(path.toString)
+       withAllParquetReaders {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
 index 30503af0fab..1491f4bc2d5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
@@ -2909,7 +3099,7 @@ index 5c0b7def039..151184bc98c 100644
      assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
        s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index 0acb21f3e6f..3a7bb73f03c 100644
+index 0acb21f3e6f..1f9c3fd13fc 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 @@ -27,7 +27,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
@@ -2917,7 +3107,7 @@ index 0acb21f3e6f..3a7bb73f03c 100644
  
  import org.apache.spark.SparkException
 -import org.apache.spark.sql.{AnalysisException, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, 
IgnoreCometNativeDataFusion, Row}
  import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
  import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
  import org.apache.spark.sql.functions.desc
@@ -2931,10 +3121,39 @@ index 0acb21f3e6f..3a7bb73f03c 100644
      withTempPath { dir =>
        val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = false)
        val expectedMessage = "Encountered error while reading file"
+@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("schema mismatch failure error message for parquet vectorized reader") 
{
++  test("schema mismatch failure error message for parquet vectorized reader",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     withTempPath { dir =>
+       val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = true)
+       
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>") {
++  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720";))
 {
+     import testImplicits._
+ 
+     withTempPath { dir =>
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
-index 09ed6955a51..236a4e99824 100644
+index 09ed6955a51..6f9174c9545 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+@@ -24,7 +24,7 @@ import 
org.apache.parquet.format.converter.ParquetMetadataConverter
+ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
+ 
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, 
QueryTest, Row}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.col
 @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
      withClue(
        s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase 
mode " +
@@ -2955,7 +3174,7 @@ index 09ed6955a51..236a4e99824 100644
      }
    }
  
-@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite
+@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite
        (Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
        (Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
        (Seq("1.23", "10.34"), FloatType, DoubleType),
@@ -2963,8 +3182,67 @@ index 09ed6955a51..236a4e99824 100644
 +      // TODO: Comet cannot handle older than "1582-10-15"
 +      (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, 
TimestampNTZType)
      )
++    wideningTags: Seq[org.scalatest.Tag] =
++      if (fromType == DateType && toType == TimestampNTZType) {
++        Seq(IgnoreCometNativeDataFusion(
++          "https://github.com/apache/datafusion-comet/issues/3728";))
++      } else Seq.empty
    }
-   test(s"parquet widening conversion $fromType -> $toType") {
+-  test(s"parquet widening conversion $fromType -> $toType") {
++  test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*) 
{
+     checkAllParquetReaders(values, fromType, toType, expectError = false)
+   }
+ 
+@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite
+       (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
+     )
+   }
+-  test(s"unsupported parquet conversion $fromType -> $toType") {
++  test(s"unsupported parquet conversion $fromType -> $toType",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     checkAllParquetReaders(values, fromType, toType, expectError = true)
+   }
+ 
+@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite
+       (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
+     )
+   }
+-  test(s"unsupported parquet conversion $fromType -> $toType") {
++  test(s"unsupported parquet conversion $fromType -> $toType",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     checkAllParquetReaders(values, fromType, toType,
+       expectError =
+       // parquet-mr allows reading decimals into a smaller precision decimal 
type without
+@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite
+       (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, 
DateType))
+     outputTimestampType <- ParquetOutputTimestampType.values
+   }
+-  test(s"unsupported parquet timestamp conversion $fromType 
($outputTimestampType) -> $toType") {
++  test(s"unsupported parquet timestamp conversion $fromType 
($outputTimestampType) -> $toType",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     withSQLConf(
+       SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
outputTimestampType.toString,
+       SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> 
LegacyBehaviorPolicy.CORRECTED.toString
+@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite
+       Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
+   }
+   test(
+-    s"parquet decimal precision change Decimal($fromPrecision, 2) -> 
Decimal($toPrecision, 2)") {
++    s"parquet decimal precision change Decimal($fromPrecision, 2) -> 
Decimal($toPrecision, 2)",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";))
 {
+     checkAllParquetReaders(
+       values = Seq("1.23", "10.34"),
+       fromType = DecimalType(fromPrecision, 2),
+@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite
+       Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
+   }
+   test(s"parquet decimal precision and scale change Decimal($fromPrecision, 
$fromScale) -> " +
+-    s"Decimal($toPrecision, $toScale)"
++    s"Decimal($toPrecision, $toScale)",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728";)
+   ) {
+     checkAllParquetReaders(
+       values = Seq("1.23", "10.34"),
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
 index 458b5dfc0f4..d209f3c85bc 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
@@ -3038,18 +3316,29 @@ index 0dd90925d3c..7d53ec845ef 100644
        spark.range(10).selectExpr("id", "id % 3 as p")
          .write.partitionBy("p").saveAsTable("testDataForScan")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..f1c4b3d92b1 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -18,6 +18,8 @@
  package org.apache.spark.sql.execution.python
  
  import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, 
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
 +import org.apache.spark.sql.comet._
  import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+     assert(arrowEvalNodes.size == 2)
+   }
+ 
+-  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1") {
++  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+       withTempPath { f =>
+         spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
@@ -3057,7 +3346,7 @@ index 0ab8691801d..d9125f658ad 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
@@ -3076,7 +3365,7 @@ index 0ab8691801d..d9125f658ad 100644
          }
        }
      }
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -3084,7 +3373,7 @@ index 0ab8691801d..d9125f658ad 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -3495,7 +3784,7 @@ index 86c4e49f6f6..2e639e5f38d 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index f0f3f94b811..c9d0ecfec41 100644
+index f0f3f94b811..f77b54dcef9 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
@@ -3522,37 +3811,42 @@ index f0f3f94b811..c9d0ecfec41 100644
  import org.apache.spark.sql.execution.FilterExec
  import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
  import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
-         }
-       }
-     } else {
--      super.test(testName, testTags: _*)(testFun)
-+      if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+        ignore(testName + " (disabled when Comet is on)", testTags: 
_*)(testFun)
-+      } else {
-+        val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+        val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        if (isCometEnabled && isNativeIcebergCompat &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+          ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", 
testTags: _*)(testFun)
-+        } else if (isCometEnabled && isNativeDataFusion &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
-+        } else if (isCometEnabled && (isNativeDataFusion || 
isNativeIcebergCompat) &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
-+            testTags: _*)(testFun)
-+        } else {
-+          super.test(testName, testTags: _*)(testFun)
-+        }
-+      }
-     }
-   }
+@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
  
-@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase
+   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
+       (implicit pos: Position): Unit = {
++    // Check Comet skip tags first, before DisableAdaptiveExecution handling
++    if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++      ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++      return
++    }
++    if (isCometEnabled) {
++      val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++      val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      if (isNativeIcebergCompat &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++        ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: 
_*)(testFun)
++        return
++      }
++      if (isNativeDataFusion &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
++        return
++      }
++      if ((isNativeDataFusion || isNativeIcebergCompat) &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
++          testTags: _*)(testFun)
++        return
++      }
++    }
+     if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+       super.test(testName, testTags: _*) {
+         withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+@@ -248,8 +278,24 @@ private[sql] trait SQLTestUtilsBase
      override protected def converter: ColumnNodeToExpressionConverter = 
self.spark.converter
    }
  
@@ -3577,7 +3871,7 @@ index f0f3f94b811..c9d0ecfec41 100644
      super.withSQLConf(pairs: _*)(f)
    }
  
-@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -451,6 +497,8 @@ private[sql] trait SQLTestUtilsBase
      val schema = df.schema
      val withoutFilters = df.queryExecution.executedPlan.transform {
        case FilterExec(_, child) => child


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to