This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d3b2007ba fix: enable native_datafusion Spark SQL tests for #3320,
#3401, #3719 (#3718)
d3b2007ba is described below
commit d3b2007baeb54497d4e4ed0ba5ef0607bb4c91c8
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 18 07:53:10 2026 -0600
fix: enable native_datafusion Spark SQL tests for #3320, #3401, #3719
(#3718)
---
dev/diffs/3.5.8.diff | 429 ++++++++++++++-------
docs/source/contributor-guide/parquet_scans.md | 7 +-
native/Cargo.lock | 8 +-
native/core/src/errors.rs | 52 ++-
native/spark-expr/src/error.rs | 24 ++
.../sql/comet/shims/ShimSparkErrorConverter.scala | 6 +
.../sql/comet/shims/ShimSparkErrorConverter.scala | 6 +
.../sql/comet/shims/ShimSparkErrorConverter.scala | 6 +
8 files changed, 396 insertions(+), 142 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 138e729f9..db495f1e2 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index edd2ad57880..77a975ea48f 100644
+index edd2ad57880..837b95d1ada 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
@@ -93,23 +93,22 @@ index 27ae10b3d59..78e69902dfd 100644
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
-index db587dd9868..33802f29253 100644
+index db587dd9868..aac7295a53d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
++import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
-@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
+ case cometScan: CometScanExec => cometScan.metadata
-+ case nativeScan: CometNativeScanExec => nativeScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(
@@ -239,6 +238,20 @@ index e5494726695..00937f025c2 100644
}
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+index 9e8d77c53f3..855e3ada7d1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with
SharedSparkSession {
+ }
+ }
+
+- test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD") {
++ test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withTempPath { dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6f3090d8908..c08a60fb0c2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -397,14 +410,14 @@ index c4fb4fa943c..a04b23870a8 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..4acdf7e9cfb 100644
+index f33432ddb6f..42eb9fd1cb7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
++import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
@@ -448,22 +461,40 @@ index f33432ddb6f..4acdf7e9cfb 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
-@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ * Check the static scan metrics with and without DPP
+ */
+ test("static scan metrics",
+- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
+ case s: CometScanExec =>
-+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
-+ case s: CometNativeScanExec =>
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
case _ => false
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..fea1149b67d 100644
+index a206e97c353..79813d8e259 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("explain formatted - check presence of subquery in case of DPP") {
++ test("explain formatted - check presence of subquery in case of DPP",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
+ withTable("df1", "df2") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
@@ -473,7 +504,7 @@ index a206e97c353..fea1149b67d 100644
withTempDir { dir =>
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
@@ -485,10 +516,18 @@ index a206e97c353..fea1149b67d 100644
test("SPARK-35884: Explain Formatted") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..ca79ad8b6d9 100644
+index 93275487f29..510e3087e0f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
+
+ import scala.collection.mutable
+
++import org.apache.comet.CometConf
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{LocalFileSystem, Path}
+
+@@ -33,6 +34,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -496,7 +535,16 @@ index 93275487f29..ca79ad8b6d9 100644
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
-@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ case "" => "_LEGACY_ERROR_TEMP_2062"
+ case _ => "_LEGACY_ERROR_TEMP_2055"
+ }
++ // native_datafusion Parquet scan cannot throw a
SparkFileNotFoundException
++ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
+ checkErrorMatchPVals(
+ exception = intercept[SparkException] {
+ testIgnoreMissingFiles(options)
+@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
}
Seq("parquet", "orc").foreach { format =>
@@ -506,7 +554,7 @@ index 93275487f29..ca79ad8b6d9 100644
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
-@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
@@ -514,7 +562,7 @@ index 93275487f29..ca79ad8b6d9 100644
}
assert(smJoinExec.nonEmpty)
}
-@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -522,7 +570,7 @@ index 93275487f29..ca79ad8b6d9 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -530,7 +578,7 @@ index 93275487f29..ca79ad8b6d9 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -930,73 +978,6 @@ index 3cf2bfd17ab..49728c35c42 100644
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index fa1a64460fc..134f0db1fb8 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-@@ -17,6 +17,8 @@
-
- package org.apache.spark.sql
-
-+import org.apache.comet.CometConf
-+
- import org.apache.spark.{SPARK_DOC_ROOT, SparkIllegalArgumentException,
SparkRuntimeException}
- import org.apache.spark.sql.catalyst.expressions.Cast._
- import org.apache.spark.sql.execution.FormattedMode
-@@ -178,29 +180,31 @@ class StringFunctionsSuite extends QueryTest with
SharedSparkSession {
- }
-
- test("string regex_replace / regex_extract") {
-- val df = Seq(
-- ("100-200", "(\\d+)-(\\d+)", "300"),
-- ("100-200", "(\\d+)-(\\d+)", "400"),
-- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
-+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
-+ val df = Seq(
-+ ("100-200", "(\\d+)-(\\d+)", "300"),
-+ ("100-200", "(\\d+)-(\\d+)", "400"),
-+ ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
-
-- checkAnswer(
-- df.select(
-- regexp_replace($"a", "(\\d+)", "num"),
-- regexp_replace($"a", $"b", $"c"),
-- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
-- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
-- Row("num-num", "400-400", "100") :: Nil)
--
-- // for testing the mutable state of the expression in code gen.
-- // This is a hack way to enable the codegen, thus the codegen is enable
by default,
-- // it will still use the interpretProjection if projection followed by a
LocalRelation,
-- // hence we add a filter operator.
-- // See the optimizer rule `ConvertToLocalRelation`
-- checkAnswer(
-- df.filter("isnotnull(a)").selectExpr(
-- "regexp_replace(a, b, c)",
-- "regexp_extract(a, b, 1)"),
-- Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: Nil)
-+ checkAnswer(
-+ df.select(
-+ regexp_replace($"a", "(\\d+)", "num"),
-+ regexp_replace($"a", $"b", $"c"),
-+ regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
-+ Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
-+ Row("num-num", "400-400", "100") :: Nil)
-+
-+ // for testing the mutable state of the expression in code gen.
-+ // This is a hack way to enable the codegen, thus the codegen is enable
by default,
-+ // it will still use the interpretProjection if projection followed by
a LocalRelation,
-+ // hence we add a filter operator.
-+ // See the optimizer rule `ConvertToLocalRelation`
-+ checkAnswer(
-+ df.filter("isnotnull(a)").selectExpr(
-+ "regexp_replace(a, b, c)",
-+ "regexp_extract(a, b, 1)"),
-+ Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") ::
Nil)
-+ }
- }
-
- test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 04702201f82..5ee11f83ecf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1036,6 +1017,20 @@ index 04702201f82..5ee11f83ecf 100644
}
assert(exchanges.size === 1)
}
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+index 9f8e979e3fb..3bc9dab8023 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
+ spark.catalog.dropTempView("tmp_table")
+ }
+
+- test("SPARK-8005 input_file_name") {
++ test("SPARK-8005 input_file_name",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withTempPath { dir =>
+ val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index d269290e616..13726a31e07 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1100,18 +1095,31 @@ index d269290e616..13726a31e07 100644
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..c4be7eb3731 100644
+index cfc8b2cc845..b7c234e1437 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
+ import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.SparkConf
- import org.apache.spark.sql.{AnalysisException, QueryTest}
+-import org.apache.spark.sql.{AnalysisException, QueryTest}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with
SharedSparkSession {
+ }
+ }
+
+- test("Fallback Parquet V2 to V1") {
++ test("Fallback Parquet V2 to V1",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach {
format =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+ val commands = ArrayBuffer.empty[(String, LogicalPlan)]
+@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
val df = spark.read.format(format).load(path.getCanonicalPath)
checkAnswer(df, inputData.toDF())
assert(
@@ -1375,6 +1383,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index a1147c16cc8..c7a29496328 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.{SparkArithmeticException, SparkException,
SparkFileNotFoundException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
+ }
+ }
+
+- test("alter temporary view should follow current storeAnalyzedPlanForView
config") {
++ test("alter temporary view should follow current storeAnalyzedPlanForView
config",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314"))
{
+ withTable("t") {
+ Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+ withView("v1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -1969,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 8e88049f51e..6150a556f9b 100644
+index 8e88049f51e..b713ccddfcb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
@@ -2058,7 +2088,7 @@ index 8e88049f51e..6150a556f9b 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
-@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1952,8 +1968,17 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
val e = intercept[SparkException] {
sql(s"select a from $tableName where b > 0").collect()
}
@@ -2066,16 +2096,19 @@ index 8e88049f51e..6150a556f9b 100644
- """Found duplicate field(s) "B": [B, b] in case-insensitive
mode"""))
+ assert(e.getCause.isInstanceOf[RuntimeException])
+ val msg = e.getCause.getMessage
-+ // native_datafusion produces a different error message for
duplicate fields
++ // native_datafusion converts DataFusion's "Unable to get field
named" error
++ // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b"
vs "B")
++ // because DataFusion resolves field names case-insensitively
+ assert(
+ msg.contains(
+ """Found duplicate field(s) "B": [B, b] in case-insensitive
mode""") ||
-+ msg.contains("Unable to get field named"),
++ msg.contains(
++ """Found duplicate field(s) "b": [B, b] in case-insensitive
mode"""),
+ s"Unexpected error message: $msg")
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1984,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2085,7 +2118,7 @@ index 8e88049f51e..6150a556f9b 100644
// block 1:
// null count min
max
// page-0 0 0
99
-@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -2044,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2095,7 +2128,7 @@ index 8e88049f51e..6150a556f9b 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
-@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2276,7 +2303,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2108,7 +2141,7 @@ index 8e88049f51e..6150a556f9b 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
-@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2336,7 +2367,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2122,7 +2155,7 @@ index 8e88049f51e..6150a556f9b 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the
query.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8ed9ef1630e..f312174b182 100644
+index 8ed9ef1630e..a865928c1b2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
@@ -2131,7 +2164,7 @@ index 8ed9ef1630e..f312174b182 100644
- test("SPARK-35640: read binary as timestamp should throw schema
incompatible error") {
+ test("SPARK-35640: read binary as timestamp should throw schema
incompatible error",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1",
DataTypes.TimestampType)))
@@ -2141,7 +2174,7 @@ index 8ed9ef1630e..f312174b182 100644
- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
@@ -2156,7 +2189,7 @@ index 8ed9ef1630e..f312174b182 100644
checkAnswer(
// "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index f6472ba3d9d..ce39ebb52e6 100644
+index f6472ba3d9d..7f00caf5063 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
@@ -2165,7 +2198,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
val data = (1 to 1000).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
@@ -2215,7 +2248,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
- test("row group skipping doesn't overflow when reading into larger type") {
+ test("row group skipping doesn't overflow when reading into larger type",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720"))
{
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different
exceptions, we don't need
@@ -2414,32 +2447,42 @@ index 5cdbdc27b32..307fba16578 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..b18a5bea944 100644
+index 0ab8691801d..7b81f3a8f6d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,9 @@
+
package org.apache.spark.sql.execution.python
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+ assert(arrowEvalNodes.size == 2)
+ }
+
+- test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1") {
++ test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath { f =>
+ spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
-+ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
-+ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
// $"a" is not null and $"a" > 1
@@ -2448,14 +2491,13 @@ index 0ab8691801d..b18a5bea944 100644
+ val dataFilters = scanNodes.head match {
+ case scan: FileSourceScanExec => scan.dataFilters
+ case scan: CometScanExec => scan.dataFilters
-+ case scan: CometNativeScanExec => scan.dataFilters
+ }
+ assert(dataFilters.length == 2)
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
}
}
}
-@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2463,7 +2505,7 @@ index 0ab8691801d..b18a5bea944 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2488,7 +2530,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..7a6a88a9fce 100644
+index 746f289c393..5b9e31c1fa6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2513,7 +2555,7 @@ index 746f289c393..7a6a88a9fce 100644
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
}
@@ -2523,7 +2565,6 @@ index 746f289c393..7a6a88a9fce 100644
+ val fileScan = collect(plan) {
+ case f: FileSourceScanExec => f
+ case f: CometScanExec => f
-+ case f: CometNativeScanExec => f
+ }
assert(fileScan.nonEmpty, plan)
fileScan.head
@@ -2532,13 +2573,12 @@ index 746f289c393..7a6a88a9fce 100644
+ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan)
match {
+ case fs: FileSourceScanExec => fs.bucketedScan
+ case bs: CometScanExec => bs.bucketedScan
-+ case ns: CometNativeScanExec => ns.bucketedScan
+ }
+
// To verify if the bucket pruning works, this function checks two
conditions:
// 1) Check if the pruned buckets (before filtering) are empty.
// 2) Verify the final result is the same as the expected one
-@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
val planWithoutBucketedScan =
bucketedDataFrame.filter(filterCondition)
.queryExecution.executedPlan
val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2548,7 +2588,7 @@ index 746f289c393..7a6a88a9fce 100644
val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
val joinOperator = if
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2611,7 +2651,14 @@ index 746f289c393..7a6a88a9fce 100644
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
-@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- test("disable bucketing when the output doesn't contain all bucketing
columns") {
++ test("disable bucketing when the output doesn't contain all bucketing
columns",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
val scanDF = spark.table("bucketed_table").select("j")
@@ -2625,7 +2672,7 @@ index 746f289c393..7a6a88a9fce 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions")
{
@@ -2636,7 +2683,7 @@ index 746f289c393..7a6a88a9fce 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than
bucket number") {
@@ -2647,7 +2694,7 @@ index 746f289c393..7a6a88a9fce 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
-@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("bucket coalescing eliminates shuffle") {
@@ -2658,7 +2705,17 @@ index 746f289c393..7a6a88a9fce 100644
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4
output partitions.
-@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- test("bucket coalescing is applied when join expressions match with
partitioning expressions") {
++ test("bucket coalescing is applied when join expressions match with
partitioning expressions",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
+@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
@@ -2669,7 +2726,6 @@ index 746f289c393..7a6a88a9fce 100644
val scans = collect(plan) {
case f: FileSourceScanExec if
f.optionalNumCoalescedBuckets.isDefined => f
+ case b: CometScanExec if
b.optionalNumCoalescedBuckets.isDefined => b
-+ case b: CometNativeScanExec if
b.optionalNumCoalescedBuckets.isDefined => b
}
if (expectedCoalescedNumBuckets.isDefined) {
assert(scans.length == 1)
@@ -2679,8 +2735,6 @@ index 746f289c393..7a6a88a9fce 100644
+ assert(f.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
+ case b: CometScanExec =>
+ assert(b.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
-+ case b: CometNativeScanExec =>
-+ assert(b.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
+ }
} else {
assert(scans.isEmpty)
@@ -2710,18 +2764,20 @@ index 6f897a9c0b7..b0723634f68 100644
protected override lazy val sql = spark.sql _
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index d675503a8ba..f220892396e 100644
+index d675503a8ba..c386a8cb686 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,8 @@
+
package org.apache.spark.sql.sources
- import org.apache.spark.sql.QueryTest
-+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
+-import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest}
++import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.internal.SQLConf
-@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
+@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int):
Unit = {
val plan = sql(query).queryExecution.executedPlan
@@ -2729,11 +2785,60 @@ index d675503a8ba..f220892396e 100644
+ val bucketedScan = collect(plan) {
+ case s: FileSourceScanExec if s.bucketedScan => s
+ case s: CometScanExec if s.bucketedScan => s
-+ case s: CometNativeScanExec if s.bucketedScan => s
+ }
assert(bucketedScan.length == expectedNumBucketedScan)
}
+@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - basic test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins
test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins
test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - multiple
bucketed columns test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple
bucketed columns test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2")
+@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - other
operators test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - other
operators test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("Aggregates with no groupby over tables having 1 BUCKET, return
multiple rows") {
++ test("Aggregates with no groupby over tables having 1 BUCKET, return
multiple rows",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1") {
+ withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+ sql(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 7f6fa2a123e..c778b4e2c48 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -2868,6 +2973,39 @@ index aad91601758..201083bd621 100644
})
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+index b5cf13a9c12..ac17603fb7f 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
+
+ import org.apache.spark.{SparkException, TestUtils}
+ import org.apache.spark.internal.Logging
+-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
IgnoreCometNativeDataFusion, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn,
Shuffle, Uuid}
+ import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
CTERelationRef, LocalRelation}
+@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
+ )
+ }
+
+- test("SPARK-41198: input row calculation with CTE") {
++ test("SPARK-41198: input row calculation with CTE",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ withTable("parquet_tbl", "parquet_streaming_tbl") {
+ spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
+ .write.format("parquet").saveAsTable("parquet_tbl")
+@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
+ }
+ }
+
+- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources") {
++ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ withTable("parquet_streaming_tbl") {
+ val streamInput = MemoryStream[Int]
+ val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS
value_stream")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
index 8f099c31e6b..ce4b7ad25b3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
@@ -3111,6 +3249,29 @@ index de3b1ffccf0..2a76d127093 100644
override def beforeEach(): Unit = {
super.beforeEach()
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+index f3be79f9022..b4b1ea8dbc4 100644
+---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
++++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+@@ -34,7 +34,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
+ import org.apache.hadoop.io.{LongWritable, Writable}
+
+ import org.apache.spark.{SparkException, SparkFiles, TestUtils}
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest, Row}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+ import org.apache.spark.sql.catalyst.plans.logical.Project
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with
TestHiveSingleton with SQLTestUtils {
+ }
+ }
+
+- test("SPARK-11522 select input_file_name from non-parquet table") {
++ test("SPARK-11522 select input_file_name from non-parquet table",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+
+ withTempDir { tempDir =>
+
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6160c3e5f6c..0956d7d9edc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/docs/source/contributor-guide/parquet_scans.md
b/docs/source/contributor-guide/parquet_scans.md
index 2a10bb111..833eda75e 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -63,9 +63,10 @@ cause Comet to fall back to Spark.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these
functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to
`true`
- No support for duplicate field names in case-insensitive mode. When the
required or data schema contains
- field names that differ only by case (e.g., `B` and `b`), Comet falls back
to Spark. Note that duplicates
- in the physical Parquet file that are not reflected in the table schema
cannot be detected at plan time,
- so DataFusion may produce a different error message than Spark in that case.
+ field names that differ only by case (e.g., `B` and `b`), Comet falls back
to Spark. Duplicates
+ in the physical Parquet file that are not reflected in the table schema
cannot be detected at plan time;
+ in that case DataFusion will throw a `SparkRuntimeException` with error
class `_LEGACY_ERROR_TEMP_2093`,
+ matching Spark's behavior.
The `native_iceberg_compat` scan has the following additional limitation that
may produce incorrect results
without falling back to Spark:
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 05b673346..230fc2a53 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1826,7 +1826,7 @@ dependencies = [
[[package]]
name = "datafusion-comet"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arrow",
"assertables",
@@ -1901,7 +1901,7 @@ dependencies = [
[[package]]
name = "datafusion-comet-objectstore-hdfs"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"async-trait",
"bytes",
@@ -1915,7 +1915,7 @@ dependencies = [
[[package]]
name = "datafusion-comet-proto"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"prost",
"prost-build",
@@ -1923,7 +1923,7 @@ dependencies = [
[[package]]
name = "datafusion-comet-spark-expr"
-version = "0.14.0"
+version = "0.15.0"
dependencies = [
"arrow",
"base64",
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index d4582da63..2886df29d 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -436,13 +436,15 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError,
backtrace: Option<Strin
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env,
spark_error),
_ => {
- // Check for file-not-found errors that may arrive through
other wrapping paths
let error_msg = error.to_string();
+ // Check for file-not-found errors that may arrive through
other wrapping paths
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message:
error_msg };
throw_spark_error_as_json(env, &spark_error)
+ } else if let Some(spark_error) =
try_convert_duplicate_field_error(&error_msg) {
+ throw_spark_error_as_json(env, &spark_error)
} else {
let exception = error.to_exception();
match backtrace {
@@ -474,6 +476,54 @@ fn throw_spark_error_as_json(
)
}
+/// Try to convert a DataFusion "Unable to get field named" error into a
SparkError.
+/// DataFusion produces this error when reading Parquet files with duplicate
field names
+/// in case-insensitive mode. For example, if a Parquet file has columns "B"
and "b",
+/// DataFusion may deduplicate them and report: Unable to get field named "b".
Valid
+/// fields: ["A", "B"]. When the requested field has a case-insensitive match
among the
+/// valid fields, we convert this to Spark's _LEGACY_ERROR_TEMP_2093 error.
+fn try_convert_duplicate_field_error(error_msg: &str) -> Option<SparkError> {
+ // Match: Schema error: Unable to get field named "X". Valid fields: [...]
+ lazy_static! {
+ static ref FIELD_RE: Regex =
+ Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields:
\[(.+)\]"#).unwrap();
+ }
+ if let Some(caps) = FIELD_RE.captures(error_msg) {
+ let requested_field = caps.get(1)?.as_str();
+ let requested_lower = requested_field.to_lowercase();
+ // Parse field names from the Valid fields list: ["A", "B"] or [A, B,
b]
+ let valid_fields_raw = caps.get(2)?.as_str();
+ let all_fields: Vec<String> = valid_fields_raw
+ .split(',')
+ .map(|s| s.trim().trim_matches('"').to_string())
+ .collect();
+ // Find fields that match case-insensitively
+ let mut matched: Vec<String> = all_fields
+ .into_iter()
+ .filter(|f| f.to_lowercase() == requested_lower)
+ .collect();
+ // Need at least one case-insensitive match to treat this as a
duplicate field error.
+ // DataFusion may deduplicate columns case-insensitively, so the valid
fields list
+ // might contain only one variant (e.g. "B" when file has both "B" and
"b").
+ // If requested field differs from the match, both existed in the
original file.
+ if matched.is_empty() {
+ return None;
+ }
+ // Add the requested field name if it's not already in the list
(different case)
+ if !matched.iter().any(|f| f == requested_field) {
+ matched.push(requested_field.to_string());
+ }
+ let required_field_name = requested_field.to_string();
+ let matched_fields = format!("[{}]", matched.join(", "));
+ Some(SparkError::DuplicateFieldCaseInsensitive {
+ required_field_name,
+ matched_fields,
+ })
+ } else {
+ None
+ }
+}
+
#[derive(Debug, Error)]
enum StacktraceError {
#[error("Unable to initialize message: {0}")]
diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs
index 592ed8b44..9633dc98d 100644
--- a/native/spark-expr/src/error.rs
+++ b/native/spark-expr/src/error.rs
@@ -169,6 +169,12 @@ pub enum SparkError {
#[error("{message}")]
FileNotFound { message: String },
+ #[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s)
\"{required_field_name}\": [{matched_fields}] in case-insensitive mode")]
+ DuplicateFieldCaseInsensitive {
+ required_field_name: String,
+ matched_fields: String,
+ },
+
#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),
@@ -240,6 +246,7 @@ impl SparkError {
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
SparkError::ScalarSubqueryTooManyRows =>
"ScalarSubqueryTooManyRows",
SparkError::FileNotFound { .. } => "FileNotFound",
+ SparkError::DuplicateFieldCaseInsensitive { .. } =>
"DuplicateFieldCaseInsensitive",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
@@ -430,6 +437,15 @@ impl SparkError {
"message": message,
})
}
+ SparkError::DuplicateFieldCaseInsensitive {
+ required_field_name,
+ matched_fields,
+ } => {
+ serde_json::json!({
+ "requiredFieldName": required_field_name,
+ "matchedOrcFields": matched_fields,
+ })
+ }
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
@@ -499,6 +515,11 @@ impl SparkError {
// FileNotFound - will be converted to SparkFileNotFoundException
by the shim
SparkError::FileNotFound { .. } =>
"org/apache/spark/SparkException",
+ // DuplicateFieldCaseInsensitive - converted to
SparkRuntimeException by the shim
+ SparkError::DuplicateFieldCaseInsensitive { .. } => {
+ "org/apache/spark/SparkRuntimeException"
+ }
+
// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) =>
"org/apache/spark/SparkException",
}
@@ -574,6 +595,9 @@ impl SparkError {
// File not found
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
+ // Duplicate field in case-insensitive mode
+ SparkError::DuplicateFieldCaseInsensitive { .. } =>
Some("_LEGACY_ERROR_TEMP_2093"),
+
// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
diff --git
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index da65b1eb4..e4ec9e006 100644
---
a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -251,6 +251,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow",
"", sqlCtx(context)))
+ case "DuplicateFieldCaseInsensitive" =>
+ Some(
+ QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+ params("requiredFieldName").toString,
+ params("matchedOrcFields").toString))
+
case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
diff --git
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index ae21d1276..41f461100 100644
---
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -247,6 +247,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow",
"", sqlCtx(context)))
+ case "DuplicateFieldCaseInsensitive" =>
+ Some(
+ QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+ params("requiredFieldName").toString,
+ params("matchedOrcFields").toString))
+
case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
index 01d4eac4b..f906db140 100644
---
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
+++
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala
@@ -258,6 +258,12 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
context.headOption.orNull))
+ case "DuplicateFieldCaseInsensitive" =>
+ Some(
+ QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
+ params("requiredFieldName").toString,
+ params("matchedOrcFields").toString))
+
case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]