This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e7eae6665 fix: unignore input_file_name Spark SQL tests for 
native_datafusion (#3458)
e7eae6665 is described below

commit e7eae6665e9c5bdc6a7949afc2411d7fff9f5d88
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 13 12:46:47 2026 -0700

    fix: unignore input_file_name Spark SQL tests for native_datafusion (#3458)
    
    * fix: unignore input_file_name Spark SQL tests for native_datafusion
    
    The native_datafusion scan now correctly falls back to Spark's
    FileSourceScanExec when metadata columns (like input_file_name) are
    present, so the 3 input_file_name tests no longer need to be ignored.
    
    For ExtractPythonUDFsSuite, the issue was that the test's collect
    pattern didn't match CometNativeScanExec. Fixed by adding
    CometNativeScanExec to the collect and dataFilters match blocks.
    
    Closes #3312
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: restore IgnoreComet.scala in 3.5.8 Spark SQL test diff
    
    The previous commit accidentally removed the IgnoreComet.scala file
    creation from the diff, causing 94 compilation errors when applied
    to Spark 3.5.8.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: fall back scan when plan uses input_file_name expressions
    
    CometScanExec does not populate InputFileBlockHolder (the thread-local
    that Spark's FileScanRDD sets), so input_file_name(),
    input_file_block_start(), and input_file_block_length() return empty
    or default values when Comet replaces the scan. Detect these
    expressions in the plan and fall back to Spark's FileSourceScanExec.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 dev/diffs/3.5.8.diff                               | 78 +++-------------------
 .../org/apache/comet/rules/CometScanRule.scala     | 27 ++++++--
 2 files changed, 31 insertions(+), 74 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index beef44549..d2d72e9d6 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644
    }
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-index 9e8d77c53f3..855e3ada7d1 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
-@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSparkSession {
-     }
-   }
- 
--  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
-+  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
-     withTempPath { dir =>
-       val data = sparkContext.parallelize(0 to 10).toDF("id")
-       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 index 6f3090d8908..c08a60fb0c2 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1084,20 +1070,6 @@ index 04702201f82..5ee11f83ecf 100644
        }
        assert(exchanges.size === 1)
      }
-diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-index 9f8e979e3fb..3bc9dab8023 100644
---- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
-@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
-     spark.catalog.dropTempView("tmp_table")
-   }
- 
--  test("SPARK-8005 input_file_name") {
-+  test("SPARK-8005 input_file_name",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
-     withTempPath { dir =>
-       val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
-       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 index d269290e616..13726a31e07 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -2504,42 +2476,32 @@ index 5cdbdc27b32..307fba16578 100644
        spark.range(10).selectExpr("id", "id % 3 as p")
          .write.partitionBy("p").saveAsTable("testDataForScan")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..7b81f3a8f6d 100644
+index 0ab8691801d..b18a5bea944 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -17,7 +17,9 @@
- 
+@@ -18,6 +18,7 @@
  package org.apache.spark.sql.execution.python
  
-+import org.apache.spark.sql.IgnoreCometNativeDataFusion
  import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, 
BatchEvalPython, Limit, LocalLimit}
 +import org.apache.spark.sql.comet._
  import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
-     assert(arrowEvalNodes.size == 2)
-   }
- 
--  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1") {
-+  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
-     withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
-       withTempPath { f =>
-         spark.range(10).select($"id".as("a"), $"id".as("b"))
-@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
++            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
 +            case scan: CometScanExec => scan
++            case scan: CometNativeScanExec => scan
            }
            assert(scanNodes.length == 1)
            // $"a" is not null and $"a" > 1
@@ -2548,13 +2510,14 @@ index 0ab8691801d..7b81f3a8f6d 100644
 +          val dataFilters = scanNodes.head match {
 +            case scan: FileSourceScanExec => scan.dataFilters
 +            case scan: CometScanExec => scan.dataFilters
++            case scan: CometNativeScanExec => scan.dataFilters
 +          }
 +          assert(dataFilters.length == 2)
 +          assert(dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
          }
        }
      }
-@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2562,7 +2525,7 @@ index 0ab8691801d..7b81f3a8f6d 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -3243,29 +3206,6 @@ index de3b1ffccf0..2a76d127093 100644
  
    override def beforeEach(): Unit = {
      super.beforeEach()
-diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-index f3be79f9022..b4b1ea8dbc4 100644
---- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
-@@ -34,7 +34,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
- import org.apache.hadoop.io.{LongWritable, Writable}
- 
- import org.apache.spark.{SparkException, SparkFiles, TestUtils}
--import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest, Row}
- import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
- import org.apache.spark.sql.catalyst.plans.logical.Project
- import org.apache.spark.sql.execution.WholeStageCodegenExec
-@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
-     }
-   }
- 
--  test("SPARK-11522 select input_file_name from non-parquet table") {
-+  test("SPARK-11522 select input_file_name from non-parquet table",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
- 
-     withTempDir { tempDir =>
- 
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 index 6160c3e5f6c..0956d7d9edc 100644
 --- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 50abb2608..bb37515ab 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, 
InputFileBlockStart, InputFileName, PlanExpression}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, 
GenericArrayData, MetadataColumnHelper}
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
@@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession)
       metadataTableSuffix.exists(suffix => 
scanExec.table.name().endsWith(suffix))
     }
 
-    def transformScan(plan: SparkPlan): SparkPlan = plan match {
+    val fullPlan = plan
+
+    def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match {
       case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) =>
         withInfo(scan, "Comet Scan is not enabled")
 
@@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession)
 
       // data source V1
       case scanExec: FileSourceScanExec =>
-        transformV1Scan(scanExec)
+        transformV1Scan(fullPlan, scanExec)
 
       // data source V2
       case scanExec: BatchScanExec =>
@@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession)
     }
   }
 
-  private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
+  private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): 
SparkPlan = {
 
     if (COMET_DPP_FALLBACK_ENABLED.get() &&
       scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
@@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession)
             nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
               .getOrElse(scanExec)
           case SCAN_NATIVE_DATAFUSION =>
-            nativeDataFusionScan(session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
+            nativeDataFusionScan(plan, session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
           case SCAN_NATIVE_ICEBERG_COMPAT =>
             nativeIcebergCompatScan(session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
         }
@@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession)
   }
 
   private def nativeDataFusionScan(
+      plan: SparkPlan,
       session: SparkSession,
       scanExec: FileSourceScanExec,
       r: HadoopFsRelation,
@@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession)
       withInfo(scanExec, "Native DataFusion scan does not support metadata 
columns")
       return None
     }
+    // input_file_name, input_file_block_start, and input_file_block_length 
read from
+    // InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The 
native DataFusion
+    // scan does not use FileScanRDD, so these expressions would return 
empty/default values.
+    if (plan.exists(node =>
+        node.expressions.exists(_.exists {
+          case _: InputFileName | _: InputFileBlockStart | _: 
InputFileBlockLength => true
+          case _ => false
+        }))) {
+      withInfo(
+        scanExec,
+        "Native DataFusion scan is not compatible with input_file_name, " +
+          "input_file_block_start, or input_file_block_length")
+      return None
+    }
     if 
(ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
       withInfo(scanExec, "Native DataFusion scan does not support row index 
generation")
       return None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to