This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e23d94628 chore: run Spark 3.4 tests with `native_datafusion` scan
(#3722)
e23d94628 is described below
commit e23d94628254839f2194eaecf3c5bc9f9b664dde
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 23 06:29:33 2026 -0700
chore: run Spark 3.4 tests with `native_datafusion` scan (#3722)
---
.github/workflows/spark_sql_test.yml | 1 +
dev/diffs/3.4.3.diff | 273 +++++++++++++++------
dev/diffs/4.0.1.diff | 456 ++++++++++++++++++++++++++++-------
3 files changed, 579 insertions(+), 151 deletions(-)
diff --git a/.github/workflows/spark_sql_test.yml
b/.github/workflows/spark_sql_test.yml
index 4d777cda8..3a763d321 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -126,6 +126,7 @@ jobs:
# - native_iceberg_compat: Spark 3.5 only
config:
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl:
'auto'}
+ - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl:
'native_datafusion'}
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'auto'}
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'native_datafusion'}
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl:
'auto'}
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 8738b3813..e40328536 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index d3544881af1..9c16099090c 100644
+index d3544881af1..377683b10c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
@@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..1925aac8d97 100644
+index f33432ddb6f..7d758d2481f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -478,7 +478,17 @@ index f33432ddb6f..1925aac8d97 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
-@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ * Check the static scan metrics with and without DPP
+ */
+ test("static scan metrics",
+- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442"))
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -513,7 +523,7 @@ index a6b295578d6..91acca4306f 100644
test("SPARK-35884: Explain Formatted") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 2796b1cf154..52438178a0e 100644
+index 2796b1cf154..d628f44e4ee 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT,
NullData, NullUDT}
@@ -524,7 +534,17 @@ index 2796b1cf154..52438178a0e 100644
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ Seq("parquet", "orc").foreach { format =>
+- test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}") {
++ test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760"))
{
+ withTempDir { dir =>
+ val tableName = s"spark_25132_${format}_native"
+ val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
@@ -532,7 +552,7 @@ index 2796b1cf154..52438178a0e 100644
}
assert(smJoinExec.nonEmpty)
}
-@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
@@ -540,7 +560,7 @@ index 2796b1cf154..52438178a0e 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
@@ -548,7 +568,7 @@ index 2796b1cf154..52438178a0e 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -981,7 +1001,7 @@ index 48ad10992c5..51d1ee65422 100644
extensions.injectColumnar(session =>
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index 18123a4d6ec..fbe4c766eee 100644
+index 18123a4d6ec..0fe185baa33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
@@ -1983,10 +2003,18 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 104b4e416cd..37ea65081e4 100644
+index 104b4e416cd..d865077684f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
+
+ import org.apache.spark.{SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion,
IgnoreCometNativeScan}
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
+@@ -1096,7 +1097,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// When a filter is pushed to Parquet, Parquet can apply it to
every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
@@ -1999,7 +2027,7 @@ index 104b4e416cd..37ea65081e4 100644
}
}
}
-@@ -1499,7 +1503,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1499,7 +1504,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2009,7 +2037,7 @@ index 104b4e416cd..37ea65081e4 100644
import testImplicits._
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1581,7 +1586,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1581,7 +1587,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2022,7 +2050,7 @@ index 104b4e416cd..37ea65081e4 100644
}
}
}
-@@ -1608,7 +1617,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1608,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2035,7 +2063,17 @@ index 104b4e416cd..37ea65081e4 100644
}
}
}
-@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1700,7 +1714,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ (attr, value) => sources.StringContains(attr, value))
+ }
+
+- test("filter pushdown - StringPredicate") {
++ test("filter pushdown - StringPredicate",
++ IgnoreCometNativeDataFusion("cannot be pushed down")) {
+ import testImplicits._
+ // keep() should take effect on StartsWith/EndsWith/Contains
+ Seq(
+@@ -1744,7 +1759,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2045,7 +2083,17 @@ index 104b4e416cd..37ea65081e4 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
-@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-25207: exception when duplicate fields in case-insensitive
mode") {
++ test("SPARK-25207: exception when duplicate fields in case-insensitive
mode",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760"))
{
+ withTempPath { dir =>
+ val count = 10
+ val tableName = "spark_25207"
+@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2055,7 +2103,7 @@ index 104b4e416cd..37ea65081e4 100644
// block 1:
// null count min
max
// page-0 0 0
99
-@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2065,7 +2113,7 @@ index 104b4e416cd..37ea65081e4 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
-@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2078,7 +2126,7 @@ index 104b4e416cd..37ea65081e4 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
-@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2092,10 +2140,38 @@ index 104b4e416cd..37ea65081e4 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the
query.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8670d95c65e..b624c3811dd 100644
+index 8670d95c65e..c7ba51f770f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType,
MessageTypeParser}
+
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
+ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeRow}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-35640: read binary as timestamp should throw schema
incompatible error") {
++ test("SPARK-35640: read binary as timestamp should throw schema
incompatible error",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ val data = (1 to 4).map(i => Tuple1(i.toString))
+ val readSchema = StructType(Seq(StructField("_1",
DataTypes.TimestampType)))
+
+@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-35640: int as long should throw schema incompatible error") {
++ test("SPARK-35640: int as long should throw schema incompatible error",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ val data = (1 to 4).map(i => Tuple1(i))
+ val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
+
+@@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
@@ -2106,10 +2182,28 @@ index 8670d95c65e..b624c3811dd 100644
checkAnswer(
// "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index 29cb224c878..44837aa953b 100644
+index 29cb224c878..ee5a87fa200 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -978,7 +978,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+
+ import org.apache.spark.{DebugFilesystem, SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+ import org.apache.spark.sql.catalyst.util.ArrayData
+@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
++ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ val data = (1 to 1000).map { i =>
+ val ts = new java.sql.Timestamp(i)
+ Row(ts)
+@@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
}
@@ -2119,7 +2213,17 @@ index 29cb224c878..44837aa953b 100644
withAllParquetReaders {
withTempPath { path =>
// Repeated values for dictionary encoding.
-@@ -1047,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+ }
+
+- test("SPARK-34212 Parquet should read decimals correctly") {
++ test("SPARK-34212 Parquet should read decimals correctly",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
+@@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
checkAnswer(readParquet(schema, path), df)
}
@@ -2129,7 +2233,7 @@ index 29cb224c878..44837aa953b 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1069,7 +1071,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1069,7 +1074,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)
@@ -2139,7 +2243,17 @@ index 29cb224c878..44837aa953b 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
-@@ -1128,7 +1131,7 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("row group skipping doesn't overflow when reading into larger type") {
++ test("row group skipping doesn't overflow when reading into larger type",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ withTempPath { path =>
+ Seq(0).toDF("a").write.parquet(path.toString)
+ // The vectorized and non-vectorized readers will produce different
exceptions, we don't need
+@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
.where(s"a < ${Long.MaxValue}")
.collect()
}
@@ -2244,14 +2358,14 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index bf5c51b89bb..ca22370ca3b 100644
+index bf5c51b89bb..4e2f0bdb389 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type._
import org.apache.spark.SparkException
-+import org.apache.spark.sql.IgnoreComet
++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
import org.apache.spark.sql.catalyst.ScalaReflection
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
@@ -2265,6 +2379,26 @@ index bf5c51b89bb..ca22370ca3b 100644
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
val expectedMessage = "Encountered error while reading file"
+@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("schema mismatch failure error message for parquet vectorized reader")
{
++ test("schema mismatch failure error message for parquet vectorized reader",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = true)
+ assert(e.getCause.isInstanceOf[SparkException])
+@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>") {
++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ import testImplicits._
+
+ withTempPath { dir =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 3a0bd35cb70..b28f06a757f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -2314,7 +2448,7 @@ index 26e61c6b58d..cb09d7e116a 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..b18a5bea944 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
@@ -18,6 +18,7 @@
@@ -2325,19 +2459,21 @@ index 0ab8691801d..d9125f658ad 100644
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
++ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
++ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
// $"a" is not null and $"a" > 1
@@ -2346,13 +2482,14 @@ index 0ab8691801d..d9125f658ad 100644
+ val dataFilters = scanNodes.head match {
+ case scan: FileSourceScanExec => scan.dataFilters
+ case scan: CometScanExec => scan.dataFilters
++ case scan: CometNativeScanExec => scan.dataFilters
+ }
+ assert(dataFilters.length == 2)
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
}
}
}
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2360,7 +2497,7 @@ index 0ab8691801d..d9125f658ad 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2804,7 +2941,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index dd55fcfe42c..a1d390c93d0 100644
+index dd55fcfe42c..99bc018008a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2823,46 +2960,42 @@ index dd55fcfe42c..a1d390c93d0 100644
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -118,7 +120,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with
SQLTestUtilsBase with
- }
+@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
-- (implicit pos: Position): Unit = {
-+ (implicit pos: Position): Unit = {
+ (implicit pos: Position): Unit = {
++ // Check Comet skip tags first, before DisableAdaptiveExecution handling
++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ return
++ }
++ if (isCometEnabled) {
++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ if (isNativeIcebergCompat &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags:
_*)(testFun)
++ return
++ }
++ if (isNativeDataFusion &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
++ return
++ }
++ if ((isNativeDataFusion || isNativeIcebergCompat) &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
++ testTags: _*)(testFun)
++ return
++ }
++ }
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
super.test(testName, testTags: _*) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
- }
- }
- } else {
-- super.test(testName, testTags: _*)(testFun)
-+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+ ignore(testName + " (disabled when Comet is on)", testTags:
_*)(testFun)
-+ } else {
-+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ if (isCometEnabled && isNativeIcebergCompat &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)",
testTags: _*)(testFun)
-+ } else if (isCometEnabled && isNativeDataFusion &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
-+ } else if (isCometEnabled && (isNativeDataFusion ||
isNativeIcebergCompat) &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
-+ testTags: _*)(testFun)
-+ } else {
-+ super.test(testName, testTags: _*)(testFun)
-+ }
-+ }
- }
- }
-
-@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
+@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}
@@ -2892,7 +3025,7 @@ index dd55fcfe42c..a1d390c93d0 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit):
Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
-@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index a41ff3bbd..407807cac 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/pom.xml b/pom.xml
-index 22922143fc3..7c56e5e8641 100644
+index 22922143fc3..97332f7e6ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
@@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index 2c24cc7d570..21d36ebc6f5 100644
+index 2c24cc7d570..3311e6e3773 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -615,7 +615,17 @@ index 2c24cc7d570..21d36ebc6f5 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
-@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1151,7 +1157,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("join key with multiple references on the filtering plan") {
++ test("join key with multiple references on the filtering plan",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true",
+ SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
AQEPropagateEmptyRelation.ruleName,
+ SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support
"String + String"
+@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
@@ -625,7 +635,15 @@ index 2c24cc7d570..21d36ebc6f5 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
-@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1330,6 +1338,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("Subquery reuse across the whole plan",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"),
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -1424,7 +1433,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
@@ -635,7 +653,7 @@ index 2c24cc7d570..21d36ebc6f5 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
-@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1455,7 +1465,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
@@ -645,7 +663,17 @@ index 2c24cc7d570..21d36ebc6f5 100644
val df = sql(
"""
|SELECT s.store_id, f.product_id
-@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1699,7 +1710,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ * Check the static scan metrics with and without DPP
+ */
+ test("static scan metrics",
+- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1730,6 +1742,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -680,18 +708,51 @@ index 9c90e0105a4..fadf2f0f698 100644
test("SPARK-35884: Explain Formatted") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 9c529d14221..2f1bc3880fd 100644
+index 9c529d14221..6cfd87ad864 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -33,6 +33,8 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec,
CometSortMergeJoinExec}
++import org.apache.spark.sql.catalyst.util.quietly
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -967,6 +968,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ allFileBasedDataSources.foreach { format =>
+- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
++ Seq(IgnoreCometNativeDataFusion(
++ "https://github.com/apache/datafusion-comet/issues/3728"))
++ } else Seq.empty
++ test(s"Enabling/disabling ignoreMissingFiles using $format",
ignoreMissingTags: _*) { quietly {
+ def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+ }
+ }
+- }
++ }}
+ }
+
+ Seq("json", "orc").foreach { format =>
+@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ Seq("parquet", "orc").foreach { format =>
+- test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}") {
++ test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempDir { dir =>
+ val tableName = s"spark_25132_${format}_native"
+ val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
@@ -699,7 +760,7 @@ index 9c529d14221..2f1bc3880fd 100644
}
assert(smJoinExec.nonEmpty)
}
-@@ -1027,6 +1029,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -707,7 +768,7 @@ index 9c529d14221..2f1bc3880fd 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1068,6 +1071,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -715,11 +776,22 @@ index 9c529d14221..2f1bc3880fd 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1252,6 +1256,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-41017: filter pushdown with nondeterministic predicates") {
++ test("SPARK-41017: filter pushdown with nondeterministic predicates",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ withTempPath { path =>
+ val pathStr = path.getCanonicalPath
+ spark.range(10).write.parquet(pathStr)
+@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
+ case b: CometScanExec => b.dataFilters
++ case b: CometNativeScanExec => b.dataFilters
+ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
}.flatten
assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
@@ -1253,10 +1325,10 @@ index 0df7f806272..92390bd819f 100644
test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-index 2e33f6505ab..47fa031add5 100644
+index 2e33f6505ab..d0f84e8c44d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
+@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join,
LogicalPlan, Project, Sort, Union}
@@ -1268,8 +1340,11 @@ index 2e33f6505ab..47fa031add5 100644
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.test.SharedSparkSession
-@@ -1529,6 +1530,12 @@ class SubquerySuite extends QueryTest
+
+ class SubquerySuite extends QueryTest
+@@ -1529,6 +1531,12 @@ class SubquerySuite extends QueryTest
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
@@ -1282,7 +1357,17 @@ index 2e33f6505ab..47fa031add5 100644
case _ => false
})
}
-@@ -2094,7 +2101,7 @@ class SubquerySuite extends QueryTest
+@@ -2035,7 +2043,8 @@ class SubquerySuite extends QueryTest
+ }
+ }
+
+- test("Subquery reuse across the whole plan") {
++ test("Subquery reuse across the whole plan",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY.key -> "false") {
+ val df = sql(
+@@ -2094,7 +2103,7 @@ class SubquerySuite extends QueryTest
df.collect()
val exchanges = collect(df.queryExecution.executedPlan) {
@@ -1291,7 +1376,13 @@ index 2e33f6505ab..47fa031add5 100644
}
assert(exchanges.size === 1)
}
-@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest
+@@ -2674,22 +2683,31 @@ class SubquerySuite extends QueryTest
+ }
+ }
+
+- test("SPARK-43402: FileSourceScanExec supports push down data filter with
scalar subquery") {
++ test("SPARK-43402: FileSourceScanExec supports push down data filter with
scalar subquery",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
val df = sql(query)
checkAnswer(df, answer)
@@ -1821,6 +1912,20 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index 77a988f340e..1acc534064e 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
+ }
+ }
+
+- test("alter temporary view should follow current storeAnalyzedPlanForView
config") {
++ test("alter temporary view should follow current storeAnalyzedPlanForView
config",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ withTable("t") {
+ Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+ withView("v1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index aed11badb71..1a365b5aacf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -2512,22 +2617,23 @@ index 272be70f9fe..06957694002 100644
assert(collect(initialExecutedPlan) {
case i: InMemoryTableScanLike => i
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
-index 0a0b23d1e60..5685926250f 100644
+index 0a0b23d1e60..dcc9c141315 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.Expand
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
-@@ -868,6 +869,7 @@ abstract class SchemaPruningSuite
+@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case scan: FileSourceScanExec => scan.requiredSchema
+ case scan: CometScanExec => scan.requiredSchema
++ case scan: CometNativeScanExec => scan.requiredSchema
}
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
@@ -2621,10 +2727,18 @@ index cd6f41b4ef4..4b6a17344bc 100644
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 6080a5e8e4b..9aa8f49a62b 100644
+index 6080a5e8e4b..dc64436164f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-@@ -1102,7 +1102,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
+
+ import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion,
IgnoreCometNativeScan}
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
+@@ -1102,7 +1103,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// When a filter is pushed to Parquet, Parquet can apply it to
every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
@@ -2637,7 +2751,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
}
}
}
-@@ -1505,7 +1509,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1505,7 +1510,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2647,7 +2761,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
import testImplicits._
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1587,7 +1592,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1587,7 +1593,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2660,7 +2774,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
}
}
}
-@@ -1614,7 +1623,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1614,7 +1624,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2673,7 +2787,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
}
}
}
-@@ -1706,7 +1719,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1706,7 +1720,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
(attr, value) => sources.StringContains(attr, value))
}
@@ -2682,7 +2796,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
import testImplicits._
// keep() should take effect on StartsWith/EndsWith/Contains
Seq(
-@@ -1750,7 +1763,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1750,7 +1764,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2692,7 +2806,17 @@ index 6080a5e8e4b..9aa8f49a62b 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
-@@ -1993,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-25207: exception when duplicate fields in case-insensitive
mode") {
++ test("SPARK-25207: exception when duplicate fields in case-insensitive
mode",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempPath { dir =>
+ val count = 10
+ val tableName = "spark_25207"
+@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2702,7 +2826,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
// block 1:
// null count min
max
// page-0 0 0
99
-@@ -2053,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2712,7 +2836,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
-@@ -2305,7 +2321,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2725,7 +2849,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
-@@ -2368,7 +2388,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2739,10 +2863,28 @@ index 6080a5e8e4b..9aa8f49a62b 100644
case _ => assert(false, "Can not match ParquetTable in the query.")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 4474ec1fd42..97910c4fc3a 100644
+index 4474ec1fd42..05fa0257c82 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType,
MessageTypeParser}
+
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeRow}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-35640: read binary as timestamp should throw schema
incompatible error") {
++ test("SPARK-35640: read binary as timestamp should throw schema
incompatible error",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ val data = (1 to 4).map(i => Tuple1(i.toString))
+ val readSchema = StructType(Seq(StructField("_1",
DataTypes.TimestampType)))
+
+@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
@@ -2753,10 +2895,38 @@ index 4474ec1fd42..97910c4fc3a 100644
checkAnswer(
// "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index bba71f1c48d..38c60ee2584 100644
+index bba71f1c48d..0b52574e0f3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+
+ import org.apache.spark.{DebugFilesystem, SparkConf, SparkException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+ import org.apache.spark.sql.catalyst.util.ArrayData
+@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
++ test("SPARK-47447: read TimestampLTZ as TimestampNTZ",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ val providedSchema = StructType(Seq(StructField("time", TimestampNTZType,
false)))
+
+ Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
+@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("Enabling/disabling ignoreCorruptFiles") {
++ test("Enabling/disabling ignoreCorruptFiles",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
@@ -2769,7 +2939,17 @@ index bba71f1c48d..38c60ee2584 100644
}
}
}
-@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+ }
+
+- test("SPARK-34212 Parquet should read decimals correctly") {
++ test("SPARK-34212 Parquet should read decimals correctly",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
+@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
checkAnswer(readParquet(schema2, path), df)
}
@@ -2779,7 +2959,7 @@ index bba71f1c48d..38c60ee2584 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)
@@ -2789,6 +2969,16 @@ index bba71f1c48d..38c60ee2584 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("row group skipping doesn't overflow when reading into larger type") {
++ test("row group skipping doesn't overflow when reading into larger type",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ withTempPath { path =>
+ Seq(0).toDF("a").write.parquet(path.toString)
+ withAllParquetReaders {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
index 30503af0fab..1491f4bc2d5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
@@ -2909,7 +3099,7 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index 0acb21f3e6f..3a7bb73f03c 100644
+index 0acb21f3e6f..1f9c3fd13fc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,7 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
@@ -2917,7 +3107,7 @@ index 0acb21f3e6f..3a7bb73f03c 100644
import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet,
IgnoreCometNativeDataFusion, Row}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
@@ -2931,10 +3121,39 @@ index 0acb21f3e6f..3a7bb73f03c 100644
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
val expectedMessage = "Encountered error while reading file"
+@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("schema mismatch failure error message for parquet vectorized reader")
{
++ test("schema mismatch failure error message for parquet vectorized reader",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = true)
+
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>") {
++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
+ import testImplicits._
+
+ withTempPath { dir =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
-index 09ed6955a51..236a4e99824 100644
+index 09ed6955a51..6f9174c9545 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+@@ -24,7 +24,7 @@ import
org.apache.parquet.format.converter.ParquetMetadataConverter
+ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion,
QueryTest, Row}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.col
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
withClue(
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase
mode " +
@@ -2955,7 +3174,7 @@ index 09ed6955a51..236a4e99824 100644
}
}
-@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite
+@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite
(Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
(Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
(Seq("1.23", "10.34"), FloatType, DoubleType),
@@ -2963,8 +3182,67 @@ index 09ed6955a51..236a4e99824 100644
+ // TODO: Comet cannot handle older than "1582-10-15"
+ (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType,
TimestampNTZType)
)
++ wideningTags: Seq[org.scalatest.Tag] =
++ if (fromType == DateType && toType == TimestampNTZType) {
++ Seq(IgnoreCometNativeDataFusion(
++ "https://github.com/apache/datafusion-comet/issues/3728"))
++ } else Seq.empty
}
- test(s"parquet widening conversion $fromType -> $toType") {
+- test(s"parquet widening conversion $fromType -> $toType") {
++ test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*)
{
+ checkAllParquetReaders(values, fromType, toType, expectError = false)
+ }
+
+@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite
+ (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
+ )
+ }
+- test(s"unsupported parquet conversion $fromType -> $toType") {
++ test(s"unsupported parquet conversion $fromType -> $toType",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ checkAllParquetReaders(values, fromType, toType, expectError = true)
+ }
+
+@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite
+ (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
+ )
+ }
+- test(s"unsupported parquet conversion $fromType -> $toType") {
++ test(s"unsupported parquet conversion $fromType -> $toType",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ checkAllParquetReaders(values, fromType, toType,
+ expectError =
+ // parquet-mr allows reading decimals into a smaller precision decimal
type without
+@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite
+ (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType,
DateType))
+ outputTimestampType <- ParquetOutputTimestampType.values
+ }
+- test(s"unsupported parquet timestamp conversion $fromType
($outputTimestampType) -> $toType") {
++ test(s"unsupported parquet timestamp conversion $fromType
($outputTimestampType) -> $toType",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ withSQLConf(
+ SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
outputTimestampType.toString,
+ SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key ->
LegacyBehaviorPolicy.CORRECTED.toString
+@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite
+ Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
+ }
+ test(
+- s"parquet decimal precision change Decimal($fromPrecision, 2) ->
Decimal($toPrecision, 2)") {
++ s"parquet decimal precision change Decimal($fromPrecision, 2) ->
Decimal($toPrecision, 2)",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"))
{
+ checkAllParquetReaders(
+ values = Seq("1.23", "10.34"),
+ fromType = DecimalType(fromPrecision, 2),
+@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite
+ Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
+ }
+ test(s"parquet decimal precision and scale change Decimal($fromPrecision,
$fromScale) -> " +
+- s"Decimal($toPrecision, $toScale)"
++ s"Decimal($toPrecision, $toScale)",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")
+ ) {
+ checkAllParquetReaders(
+ values = Seq("1.23", "10.34"),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
index 458b5dfc0f4..d209f3c85bc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
@@ -3038,18 +3316,29 @@ index 0dd90925d3c..7d53ec845ef 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..f1c4b3d92b1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -18,6 +18,8 @@
package org.apache.spark.sql.execution.python
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+ assert(arrowEvalNodes.size == 2)
+ }
+
+- test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1") {
++ test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath { f =>
+ spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
@@ -3057,7 +3346,7 @@ index 0ab8691801d..d9125f658ad 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
@@ -3076,7 +3365,7 @@ index 0ab8691801d..d9125f658ad 100644
}
}
}
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -3084,7 +3373,7 @@ index 0ab8691801d..d9125f658ad 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -3495,7 +3784,7 @@ index 86c4e49f6f6..2e639e5f38d 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index f0f3f94b811..c9d0ecfec41 100644
+index f0f3f94b811..f77b54dcef9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
@@ -3522,37 +3811,42 @@ index f0f3f94b811..c9d0ecfec41 100644
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
- }
- }
- } else {
-- super.test(testName, testTags: _*)(testFun)
-+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+ ignore(testName + " (disabled when Comet is on)", testTags:
_*)(testFun)
-+ } else {
-+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ if (isCometEnabled && isNativeIcebergCompat &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)",
testTags: _*)(testFun)
-+ } else if (isCometEnabled && isNativeDataFusion &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
-+ } else if (isCometEnabled && (isNativeDataFusion ||
isNativeIcebergCompat) &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
-+ testTags: _*)(testFun)
-+ } else {
-+ super.test(testName, testTags: _*)(testFun)
-+ }
-+ }
- }
- }
+@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
-@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
+ (implicit pos: Position): Unit = {
++ // Check Comet skip tags first, before DisableAdaptiveExecution handling
++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ return
++ }
++ if (isCometEnabled) {
++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ if (isNativeIcebergCompat &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags:
_*)(testFun)
++ return
++ }
++ if (isNativeDataFusion &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
++ return
++ }
++ if ((isNativeDataFusion || isNativeIcebergCompat) &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
++ testTags: _*)(testFun)
++ return
++ }
++ }
+ if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+ super.test(testName, testTags: _*) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+@@ -248,8 +278,24 @@ private[sql] trait SQLTestUtilsBase
override protected def converter: ColumnNodeToExpressionConverter =
self.spark.converter
}
@@ -3577,7 +3871,7 @@ index f0f3f94b811..c9d0ecfec41 100644
super.withSQLConf(pairs: _*)(f)
}
-@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -451,6 +497,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]