This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 58cf6e176 fix: fall back to Spark when Parquet field ID matching is 
enabled in native_datafusion (#3415)
58cf6e176 is described below

commit 58cf6e176e04c884c00a3e99df77ca6a33ff349d
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 6 08:34:24 2026 -0700

    fix: fall back to Spark when Parquet field ID matching is enabled in 
native_datafusion (#3415)
---
 dev/diffs/3.5.8.diff                               | 73 ----------------------
 .../org/apache/comet/rules/CometScanRule.scala     |  6 ++
 2 files changed, 6 insertions(+), 73 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index d538d1161..72c41e4f8 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -2065,79 +2065,6 @@ index 07e2849ce6f..3e73645b638 100644
      val extraOptions = Map[String, String](
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-index 5e01d3f447c..284d6657d4f 100644
---- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
- import scala.collection.JavaConverters._
- 
- import org.apache.spark.SparkException
--import org.apache.spark.sql.{QueryTest, Row}
-+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row}
- import org.apache.spark.sql.internal.SQLConf
- import org.apache.spark.sql.test.SharedSparkSession
- import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, 
MetadataBuilder, StringType, StructType}
-@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-   private def withId(id: Int): Metadata =
-     new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, 
id).build()
- 
--  test("Parquet reads infer fields using field ids correctly") {
-+  test("Parquet reads infer fields using field ids correctly",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       val readSchema =
-         new StructType()
-@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-     }
-   }
- 
--  test("absence of field ids") {
-+  test("absence of field ids",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       val readSchema =
-         new StructType()
-@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-     }
-   }
- 
--  test("SPARK-38094: absence of field ids: reading nested schema") {
-+  test("SPARK-38094: absence of field ids: reading nested schema",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       // now with nested schema/complex type
-       val readSchema =
-@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-     }
-   }
- 
--  test("multiple id matches") {
-+  test("multiple id matches",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       val readSchema =
-         new StructType()
-@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-     }
-   }
- 
--  test("read parquet file without ids") {
-+  test("read parquet file without ids",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       val readSchema =
-         new StructType()
-@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
-     }
-   }
- 
--  test("global read/write flag should work correctly") {
-+  test("global read/write flag should work correctly",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
-     withTempDir { dir =>
-       val readSchema =
-         new StructType()
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 index 8e88049f51e..49f2001dc6b 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 4be2fe501..29555a61e 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa
 import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -201,6 +202,11 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
       withInfo(scanExec, "Native DataFusion scan does not support row index 
generation")
       return None
     }
+    if 
(session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) &&
+      ParquetUtils.hasFieldIds(scanExec.requiredSchema)) {
+      withInfo(scanExec, "Native DataFusion scan does not support Parquet 
field ID matching")
+      return None
+    }
     if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
       return None
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to