This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 58cf6e176 fix: fall back to Spark when Parquet field ID matching is
enabled in native_datafusion (#3415)
58cf6e176 is described below
commit 58cf6e176e04c884c00a3e99df77ca6a33ff349d
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 6 08:34:24 2026 -0700
fix: fall back to Spark when Parquet field ID matching is enabled in
native_datafusion (#3415)
---
dev/diffs/3.5.8.diff | 73 ----------------------
.../org/apache/comet/rules/CometScanRule.scala | 6 ++
2 files changed, 6 insertions(+), 73 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index d538d1161..72c41e4f8 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -2065,79 +2065,6 @@ index 07e2849ce6f..3e73645b638 100644
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-index 5e01d3f447c..284d6657d4f 100644
----
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
-@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
- import scala.collection.JavaConverters._
-
- import org.apache.spark.SparkException
--import org.apache.spark.sql.{QueryTest, Row}
-+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row}
- import org.apache.spark.sql.internal.SQLConf
- import org.apache.spark.sql.test.SharedSparkSession
- import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata,
MetadataBuilder, StringType, StructType}
-@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- private def withId(id: Int): Metadata =
- new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY,
id).build()
-
-- test("Parquet reads infer fields using field ids correctly") {
-+ test("Parquet reads infer fields using field ids correctly",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- val readSchema =
- new StructType()
-@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- }
- }
-
-- test("absence of field ids") {
-+ test("absence of field ids",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- val readSchema =
- new StructType()
-@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- }
- }
-
-- test("SPARK-38094: absence of field ids: reading nested schema") {
-+ test("SPARK-38094: absence of field ids: reading nested schema",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- // now with nested schema/complex type
- val readSchema =
-@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- }
- }
-
-- test("multiple id matches") {
-+ test("multiple id matches",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- val readSchema =
- new StructType()
-@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- }
- }
-
-- test("read parquet file without ids") {
-+ test("read parquet file without ids",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- val readSchema =
- new StructType()
-@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
- }
- }
-
-- test("global read/write flag should work correctly") {
-+ test("global read/write flag should work correctly",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
- withTempDir { dir =>
- val readSchema =
- new StructType()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..49f2001dc6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 4be2fe501..29555a61e 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -201,6 +202,11 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
withInfo(scanExec, "Native DataFusion scan does not support row index
generation")
return None
}
+ if
(session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) &&
+ ParquetUtils.hasFieldIds(scanExec.requiredSchema)) {
+ withInfo(scanExec, "Native DataFusion scan does not support Parquet
field ID matching")
+ return None
+ }
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]