This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a9868943e feat: make parquet native scan schema case insensitive 
(#1575)
a9868943e is described below

commit a9868943eec399185fe546cd9bd184c43ca0d757
Author: Zhen Wang <[email protected]>
AuthorDate: Sat Mar 29 01:36:52 2025 +0800

    feat: make parquet native scan schema case insensitive (#1575)
    
    ## Which issue does this PR close?
    
    Part of #1574.
    
    ## Rationale for this change
    
    The data schema of spark parquet datasource scan may not be consistent with 
the actual schema of file. This may cause the `projection/filter` to not behave 
as expected.
    
    ## What changes are included in this PR?
    
    This PR makes `data schema` and `file schema` case insensitive in schema 
adapter, but it does not affect `pruning_predicate` and 
`page_pruning_predicate`.
    
    ## How are these changes tested?
    
    added unit test
---
 native/core/src/parquet/parquet_exec.rs            |  1 +
 native/core/src/parquet/parquet_support.rs         |  4 ++
 native/core/src/parquet/schema_adapter.rs          | 51 ++++++++++++++++------
 .../apache/comet/parquet/ParquetReadSuite.scala    | 19 ++++++++
 4 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/native/core/src/parquet/parquet_exec.rs 
b/native/core/src/parquet/parquet_exec.rs
index 85a3d023c..a39ee259f 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -110,6 +110,7 @@ fn get_options(session_timezone: &str) -> 
(TableParquetOptions, SparkParquetOpti
     let mut spark_parquet_options =
         SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
     spark_parquet_options.allow_cast_unsigned_ints = true;
+    spark_parquet_options.case_sensitive = false;
     (table_parquet_options, spark_parquet_options)
 }
 
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 3d09a046a..ee830b795 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -64,6 +64,8 @@ pub struct SparkParquetOptions {
     pub use_decimal_128: bool,
     /// Whether to read dates/timestamps that were written in the legacy 
hybrid Julian + Gregorian calendar as it is. If false, throw exceptions 
instead. If the spark type is TimestampNTZ, this should be true.
     pub use_legacy_date_timestamp_or_ntz: bool,
+    // Whether schema field names are case sensitive
+    pub case_sensitive: bool,
 }
 
 impl SparkParquetOptions {
@@ -76,6 +78,7 @@ impl SparkParquetOptions {
             is_adapting_schema: false,
             use_decimal_128: false,
             use_legacy_date_timestamp_or_ntz: false,
+            case_sensitive: false,
         }
     }
 
@@ -88,6 +91,7 @@ impl SparkParquetOptions {
             is_adapting_schema: false,
             use_decimal_128: false,
             use_legacy_date_timestamp_or_ntz: false,
+            case_sensitive: false,
         }
     }
 }
diff --git a/native/core/src/parquet/schema_adapter.rs 
b/native/core/src/parquet/schema_adapter.rs
index e9d1ff640..a387ec6f5 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -84,7 +84,20 @@ impl SchemaAdapter for SparkSchemaAdapter {
     /// Panics if index is not in range for the table schema
     fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
         let field = self.required_schema.field(index);
-        Some(file_schema.fields.find(field.name())?.0)
+        Some(
+            file_schema
+                .fields
+                .iter()
+                .enumerate()
+                .find(|(_, b)| {
+                    if self.parquet_options.case_sensitive {
+                        b.name() == field.name()
+                    } else {
+                        b.name().to_lowercase() == field.name().to_lowercase()
+                    }
+                })?
+                .0,
+        )
     }
 
     /// Creates a `SchemaMapping` for casting or mapping the columns from the
@@ -104,8 +117,18 @@ impl SchemaAdapter for SparkSchemaAdapter {
         let mut field_mappings = vec![None; 
self.required_schema.fields().len()];
 
         for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
-            if let Some((table_idx, _table_field)) =
-                self.required_schema.fields().find(file_field.name())
+            if let Some((table_idx, _table_field)) = self
+                .required_schema
+                .fields()
+                .iter()
+                .enumerate()
+                .find(|(_, b)| {
+                    if self.parquet_options.case_sensitive {
+                        b.name() == file_field.name()
+                    } else {
+                        b.name().to_lowercase() == 
file_field.name().to_lowercase()
+                    }
+                })
             {
                 field_mappings[table_idx] = Some(projection.len());
                 projection.push(file_idx);
@@ -234,16 +257,18 @@ impl SchemaMapper for SchemaMapping {
             .zip(batch_cols.iter())
             .flat_map(|(field, batch_col)| {
                 self.table_schema
-                    // try to get the same field from the table schema that we 
have stored in self
-                    .field_with_name(field.name())
-                    // and if we don't have it, that's fine, ignore it. This 
may occur when we've
-                    // created an external table whose fields are a subset of 
the fields in this
-                    // file, then tried to read data from the file into this 
table. If that is the
-                    // case here, it's fine to ignore because we don't care 
about this field
-                    // anyways
-                    .ok()
+                    .fields()
+                    .iter()
+                    .enumerate()
+                    .find(|(_, b)| {
+                        if self.parquet_options.case_sensitive {
+                            b.name() == field.name()
+                        } else {
+                            b.name().to_lowercase() == 
field.name().to_lowercase()
+                        }
+                    })
                     // but if we do have it,
-                    .map(|table_field| {
+                    .map(|(_, table_field)| {
                         // try to cast it into the correct output type. we 
don't want to ignore this
                         // error, though, so it's propagated.
                         spark_parquet_convert(
@@ -253,7 +278,7 @@ impl SchemaMapper for SchemaMapping {
                         )?
                         .into_array(batch_col.len())
                         // and if that works, return the field and column.
-                        .map(|new_col| (new_col, table_field.clone()))
+                        .map(|new_col| (new_col, table_field.as_ref().clone()))
                     })
             })
             .collect::<Result<Vec<_>, _>>()?
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 6e9b731d4..a6526e5fe 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1460,6 +1460,25 @@ class ParquetReadV1Suite extends ParquetReadSuite with 
AdaptiveSparkPlanHelper {
           v1 = Some("parquet"))
     }
   }
+
+  test("test V1 parquet native scan -- case insensitive") {
+    withTempPath { path =>
+      spark.range(10).toDF("a").write.parquet(path.toString)
+      Seq(CometConf.SCAN_NATIVE_DATAFUSION, 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(
+        scanMode => {
+          withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
+            withTable("test") {
+              sql("create table test (A long) using parquet options (path '" + 
path + "')")
+              val df = sql("select A from test")
+              checkSparkAnswer(df)
+              // TODO: pushed down filters do not used schema adapter in 
datafusion, will cause empty result
+              // val df = sql("select * from test where A > 5")
+              // checkSparkAnswer(df)
+            }
+          }
+        })
+    }
+  }
 }
 
 class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper 
{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to