This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e7eae6665 fix: unignore input_file_name Spark SQL tests for
native_datafusion (#3458)
e7eae6665 is described below
commit e7eae6665e9c5bdc6a7949afc2411d7fff9f5d88
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 13 12:46:47 2026 -0700
fix: unignore input_file_name Spark SQL tests for native_datafusion (#3458)
* fix: unignore input_file_name Spark SQL tests for native_datafusion
The native_datafusion scan now correctly falls back to Spark's
FileSourceScanExec when metadata columns (like input_file_name) are
present, so the 3 input_file_name tests no longer need to be ignored.
For ExtractPythonUDFsSuite, the issue was that the test's collect
pattern didn't match CometNativeScanExec. Fixed by adding
CometNativeScanExec to the collect and dataFilters match blocks.
Closes #3312
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: restore IgnoreComet.scala in 3.5.8 Spark SQL test diff
The previous commit accidentally removed the IgnoreComet.scala file
creation from the diff, causing 94 compilation errors when applied
to Spark 3.5.8.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: fall back scan when plan uses input_file_name expressions
CometScanExec does not populate InputFileBlockHolder (the thread-local
that Spark's FileScanRDD sets), so input_file_name(),
input_file_block_start(), and input_file_block_length() return empty
or default values when Comet replaces the scan. Detect these
expressions in the plan and fall back to Spark's FileSourceScanExec.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
dev/diffs/3.5.8.diff | 78 +++-------------------
.../org/apache/comet/rules/CometScanRule.scala | 27 ++++++--
2 files changed, 31 insertions(+), 74 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index beef44549..d2d72e9d6 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644
}
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-index 9e8d77c53f3..855e3ada7d1 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD") {
-+ test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
- withTempPath { dir =>
- val data = sparkContext.parallelize(0 to 10).toDF("id")
- data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6f3090d8908..c08a60fb0c2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1084,20 +1070,6 @@ index 04702201f82..5ee11f83ecf 100644
}
assert(exchanges.size === 1)
}
-diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-index 9f8e979e3fb..3bc9dab8023 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
- spark.catalog.dropTempView("tmp_table")
- }
-
-- test("SPARK-8005 input_file_name") {
-+ test("SPARK-8005 input_file_name",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
- withTempPath { dir =>
- val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
- data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index d269290e616..13726a31e07 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -2504,42 +2476,32 @@ index 5cdbdc27b32..307fba16578 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..7b81f3a8f6d 100644
+index 0ab8691801d..b18a5bea944 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -17,7 +17,9 @@
-
+@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.python
-+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
- assert(arrowEvalNodes.size == 2)
- }
-
-- test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1") {
-+ test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
- withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
- withTempPath { f =>
- spark.range(10).select($"id".as("a"), $"id".as("b"))
-@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
++ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
++ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
// $"a" is not null and $"a" > 1
@@ -2548,13 +2510,14 @@ index 0ab8691801d..7b81f3a8f6d 100644
+ val dataFilters = scanNodes.head match {
+ case scan: FileSourceScanExec => scan.dataFilters
+ case scan: CometScanExec => scan.dataFilters
++ case scan: CometNativeScanExec => scan.dataFilters
+ }
+ assert(dataFilters.length == 2)
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
}
}
}
-@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2562,7 +2525,7 @@ index 0ab8691801d..7b81f3a8f6d 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -3243,29 +3206,6 @@ index de3b1ffccf0..2a76d127093 100644
override def beforeEach(): Unit = {
super.beforeEach()
-diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-index f3be79f9022..b4b1ea8dbc4 100644
----
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-@@ -34,7 +34,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
- import org.apache.hadoop.io.{LongWritable, Writable}
-
- import org.apache.spark.{SparkException, SparkFiles, TestUtils}
--import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest, Row}
- import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
- import org.apache.spark.sql.catalyst.plans.logical.Project
- import org.apache.spark.sql.execution.WholeStageCodegenExec
-@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with
TestHiveSingleton with SQLTestUtils {
- }
- }
-
-- test("SPARK-11522 select input_file_name from non-parquet table") {
-+ test("SPARK-11522 select input_file_name from non-parquet table",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
-
- withTempDir { tempDir =>
-
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6160c3e5f6c..0956d7d9edc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 50abb2608..bb37515ab 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength,
InputFileBlockStart, InputFileName, PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData,
GenericArrayData, MetadataColumnHelper}
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
@@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession)
metadataTableSuffix.exists(suffix =>
scanExec.table.name().endsWith(suffix))
}
- def transformScan(plan: SparkPlan): SparkPlan = plan match {
+ val fullPlan = plan
+
+ def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match {
case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) =>
withInfo(scan, "Comet Scan is not enabled")
@@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession)
// data source V1
case scanExec: FileSourceScanExec =>
- transformV1Scan(scanExec)
+ transformV1Scan(fullPlan, scanExec)
// data source V2
case scanExec: BatchScanExec =>
@@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession)
}
}
- private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
+ private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec):
SparkPlan = {
if (COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
@@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession)
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
- nativeDataFusionScan(session, scanExec, r,
hadoopConf).getOrElse(scanExec)
+ nativeDataFusionScan(plan, session, scanExec, r,
hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r,
hadoopConf).getOrElse(scanExec)
}
@@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession)
}
private def nativeDataFusionScan(
+ plan: SparkPlan,
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
@@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession)
withInfo(scanExec, "Native DataFusion scan does not support metadata
columns")
return None
}
+ // input_file_name, input_file_block_start, and input_file_block_length
read from
+ // InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The
native DataFusion
+ // scan does not use FileScanRDD, so these expressions would return
empty/default values.
+ if (plan.exists(node =>
+ node.expressions.exists(_.exists {
+ case _: InputFileName | _: InputFileBlockStart | _:
InputFileBlockLength => true
+ case _ => false
+ }))) {
+ withInfo(
+ scanExec,
+ "Native DataFusion scan is not compatible with input_file_name, " +
+ "input_file_block_start, or input_file_block_length")
+ return None
+ }
if
(ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
withInfo(scanExec, "Native DataFusion scan does not support row index
generation")
return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]