This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 76a7789ace Refactor file schema type coercions (#15268)
76a7789ace is described below

commit 76a7789ace33ced54c973fa0d5fc9d1866e1bf19
Author: xudong.w <[email protected]>
AuthorDate: Tue Mar 18 21:55:09 2025 +0800

    Refactor file schema type coercions (#15268)
    
    * Refactor file schema type coercions
    
    * resolve comments
    
    * keep old api and add deprecated
    
    * resolve comments
---
 datafusion/datasource-parquet/src/file_format.rs | 116 ++++++++++++++++++++++-
 datafusion/datasource-parquet/src/opener.rs      |  15 +--
 2 files changed, 115 insertions(+), 16 deletions(-)

diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 232dd2fbe3..cb4cfc142a 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -465,7 +465,114 @@ impl FileFormat for ParquetFormat {
     }
 }
 
+/// Apply necessary schema type coercions to make file schema match table 
schema.
+///
+/// This function performs two main types of transformations in a single pass:
+/// 1. Binary types to string types conversion - Converts binary data types to 
their
+///    corresponding string types when the table schema expects string data
+/// 2. Regular to view types conversion - Converts standard string/binary 
types to
+///    view types when the table schema uses view types
+///
+/// # Arguments
+/// * `table_schema` - The table schema containing the desired types
+/// * `file_schema` - The file schema to be transformed
+///
+/// # Returns
+/// * `Some(Schema)` - If any transformations were applied, returns the 
transformed schema
+/// * `None` - If no transformations were needed
+pub fn apply_file_schema_type_coercions(
+    table_schema: &Schema,
+    file_schema: &Schema,
+) -> Option<Schema> {
+    let mut needs_view_transform = false;
+    let mut needs_string_transform = false;
+
+    // Create a mapping of table field names to their data types for fast 
lookup
+    // and simultaneously check if we need any transformations
+    let table_fields: HashMap<_, _> = table_schema
+        .fields()
+        .iter()
+        .map(|f| {
+            let dt = f.data_type();
+            // Check if we need view type transformation
+            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
+                needs_view_transform = true;
+            }
+            // Check if we need string type transformation
+            if matches!(
+                dt,
+                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
+            ) {
+                needs_string_transform = true;
+            }
+
+            (f.name(), dt)
+        })
+        .collect();
+
+    // Early return if no transformation needed
+    if !needs_view_transform && !needs_string_transform {
+        return None;
+    }
+
+    let transformed_fields: Vec<Arc<Field>> = file_schema
+        .fields()
+        .iter()
+        .map(|field| {
+            let field_name = field.name();
+            let field_type = field.data_type();
+
+            // Look up the corresponding field type in the table schema
+            if let Some(table_type) = table_fields.get(field_name) {
+                match (table_type, field_type) {
+                    // table schema uses string type, coerce the file schema 
to use string type
+                    (
+                        &DataType::Utf8,
+                        DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                    ) => {
+                        return field_with_new_type(field, DataType::Utf8);
+                    }
+                    // table schema uses large string type, coerce the file 
schema to use large string type
+                    (
+                        &DataType::LargeUtf8,
+                        DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                    ) => {
+                        return field_with_new_type(field, DataType::LargeUtf8);
+                    }
+                    // table schema uses string view type, coerce the file 
schema to use view type
+                    (
+                        &DataType::Utf8View,
+                        DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                    ) => {
+                        return field_with_new_type(field, DataType::Utf8View);
+                    }
+                    // Handle view type conversions
+                    (&DataType::Utf8View, DataType::Utf8 | 
DataType::LargeUtf8) => {
+                        return field_with_new_type(field, DataType::Utf8View);
+                    }
+                    (&DataType::BinaryView, DataType::Binary | 
DataType::LargeBinary) => {
+                        return field_with_new_type(field, 
DataType::BinaryView);
+                    }
+                    _ => {}
+                }
+            }
+
+            // If no transformation is needed, keep the original field
+            Arc::clone(field)
+        })
+        .collect();
+
+    Some(Schema::new_with_metadata(
+        transformed_fields,
+        file_schema.metadata.clone(),
+    ))
+}
+
 /// Coerces the file schema if the table schema uses a view type.
+#[deprecated(
+    since = "47.0.0",
+    note = "Use `apply_file_schema_type_coercions` instead"
+)]
 pub fn coerce_file_schema_to_view_type(
     table_schema: &Schema,
     file_schema: &Schema,
@@ -515,6 +622,10 @@ pub fn coerce_file_schema_to_view_type(
 /// If the table schema uses a string type, coerce the file schema to use a 
string type.
 ///
 /// See [ParquetFormat::binary_as_string] for details
+#[deprecated(
+    since = "47.0.0",
+    note = "Use `apply_file_schema_type_coercions` instead"
+)]
 pub fn coerce_file_schema_to_string_type(
     table_schema: &Schema,
     file_schema: &Schema,
@@ -718,11 +829,8 @@ pub fn statistics_from_parquet_meta_calc(
         file_metadata.schema_descr(),
         file_metadata.key_value_metadata(),
     )?;
-    if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, 
&file_schema) {
-        file_schema = merged;
-    }
 
-    if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, 
&file_schema) {
+    if let Some(merged) = apply_file_schema_type_coercions(&table_schema, 
&file_schema) {
         file_schema = merged;
     }
 
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 3c623f558e..8257a796b6 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -19,14 +19,11 @@
 
 use std::sync::Arc;
 
-use crate::file_format::{
-    coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
-};
 use crate::page_filter::PagePruningAccessPlanFilter;
 use crate::row_group_filter::RowGroupAccessPlanFilter;
 use crate::{
-    row_filter, should_enable_page_index, ParquetAccessPlan, 
ParquetFileMetrics,
-    ParquetFileReaderFactory,
+    apply_file_schema_type_coercions, row_filter, should_enable_page_index,
+    ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
 };
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
@@ -131,14 +128,8 @@ impl FileOpener for ParquetOpener {
                 ArrowReaderMetadata::load_async(&mut reader, 
options.clone()).await?;
             let mut schema = Arc::clone(metadata.schema());
 
-            if let Some(merged) =
-                coerce_file_schema_to_string_type(&table_schema, &schema)
-            {
-                schema = Arc::new(merged);
-            }
-
             // read with view types
-            if let Some(merged) = 
coerce_file_schema_to_view_type(&table_schema, &schema)
+            if let Some(merged) = 
apply_file_schema_type_coercions(&table_schema, &schema)
             {
                 schema = Arc::new(merged);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to