This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 76a7789ace Refactor file schema type coercions (#15268)
76a7789ace is described below
commit 76a7789ace33ced54c973fa0d5fc9d1866e1bf19
Author: xudong.w <[email protected]>
AuthorDate: Tue Mar 18 21:55:09 2025 +0800
Refactor file schema type coercions (#15268)
* Refactor file schema type coercions
* resolve comments
* keep old api and add deprecated
* resolve comments
---
datafusion/datasource-parquet/src/file_format.rs | 116 ++++++++++++++++++++++-
datafusion/datasource-parquet/src/opener.rs | 15 +--
2 files changed, 115 insertions(+), 16 deletions(-)
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index 232dd2fbe3..cb4cfc142a 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -465,7 +465,114 @@ impl FileFormat for ParquetFormat {
}
}
+/// Apply necessary schema type coercions to make file schema match table
schema.
+///
+/// This function performs two main types of transformations in a single pass:
+/// 1. Binary types to string types conversion - Converts binary data types to
their
+/// corresponding string types when the table schema expects string data
+/// 2. Regular to view types conversion - Converts standard string/binary
types to
+/// view types when the table schema uses view types
+///
+/// # Arguments
+/// * `table_schema` - The table schema containing the desired types
+/// * `file_schema` - The file schema to be transformed
+///
+/// # Returns
+/// * `Some(Schema)` - If any transformations were applied, returns the
transformed schema
+/// * `None` - If no transformations were needed
+pub fn apply_file_schema_type_coercions(
+ table_schema: &Schema,
+ file_schema: &Schema,
+) -> Option<Schema> {
+ let mut needs_view_transform = false;
+ let mut needs_string_transform = false;
+
+ // Create a mapping of table field names to their data types for fast
lookup
+ // and simultaneously check if we need any transformations
+ let table_fields: HashMap<_, _> = table_schema
+ .fields()
+ .iter()
+ .map(|f| {
+ let dt = f.data_type();
+ // Check if we need view type transformation
+ if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
+ needs_view_transform = true;
+ }
+ // Check if we need string type transformation
+ if matches!(
+ dt,
+ &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
+ ) {
+ needs_string_transform = true;
+ }
+
+ (f.name(), dt)
+ })
+ .collect();
+
+ // Early return if no transformation needed
+ if !needs_view_transform && !needs_string_transform {
+ return None;
+ }
+
+ let transformed_fields: Vec<Arc<Field>> = file_schema
+ .fields()
+ .iter()
+ .map(|field| {
+ let field_name = field.name();
+ let field_type = field.data_type();
+
+ // Look up the corresponding field type in the table schema
+ if let Some(table_type) = table_fields.get(field_name) {
+ match (table_type, field_type) {
+ // table schema uses string type, coerce the file schema
to use string type
+ (
+ &DataType::Utf8,
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ return field_with_new_type(field, DataType::Utf8);
+ }
+ // table schema uses large string type, coerce the file
schema to use large string type
+ (
+ &DataType::LargeUtf8,
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ return field_with_new_type(field, DataType::LargeUtf8);
+ }
+ // table schema uses string view type, coerce the file
schema to use view type
+ (
+ &DataType::Utf8View,
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ return field_with_new_type(field, DataType::Utf8View);
+ }
+ // Handle view type conversions
+ (&DataType::Utf8View, DataType::Utf8 |
DataType::LargeUtf8) => {
+ return field_with_new_type(field, DataType::Utf8View);
+ }
+ (&DataType::BinaryView, DataType::Binary |
DataType::LargeBinary) => {
+ return field_with_new_type(field,
DataType::BinaryView);
+ }
+ _ => {}
+ }
+ }
+
+ // If no transformation is needed, keep the original field
+ Arc::clone(field)
+ })
+ .collect();
+
+ Some(Schema::new_with_metadata(
+ transformed_fields,
+ file_schema.metadata.clone(),
+ ))
+}
+
/// Coerces the file schema if the table schema uses a view type.
+#[deprecated(
+ since = "47.0.0",
+ note = "Use `apply_file_schema_type_coercions` instead"
+)]
pub fn coerce_file_schema_to_view_type(
table_schema: &Schema,
file_schema: &Schema,
@@ -515,6 +622,10 @@ pub fn coerce_file_schema_to_view_type(
/// If the table schema uses a string type, coerce the file schema to use a
string type.
///
/// See [ParquetFormat::binary_as_string] for details
+#[deprecated(
+ since = "47.0.0",
+ note = "Use `apply_file_schema_type_coercions` instead"
+)]
pub fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
@@ -718,11 +829,8 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
- if let Some(merged) = coerce_file_schema_to_string_type(&table_schema,
&file_schema) {
- file_schema = merged;
- }
- if let Some(merged) = coerce_file_schema_to_view_type(&table_schema,
&file_schema) {
+ if let Some(merged) = apply_file_schema_type_coercions(&table_schema,
&file_schema) {
file_schema = merged;
}
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 3c623f558e..8257a796b6 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -19,14 +19,11 @@
use std::sync::Arc;
-use crate::file_format::{
- coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
-};
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::{
- row_filter, should_enable_page_index, ParquetAccessPlan,
ParquetFileMetrics,
- ParquetFileReaderFactory,
+ apply_file_schema_type_coercions, row_filter, should_enable_page_index,
+ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
};
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
@@ -131,14 +128,8 @@ impl FileOpener for ParquetOpener {
ArrowReaderMetadata::load_async(&mut reader,
options.clone()).await?;
let mut schema = Arc::clone(metadata.schema());
- if let Some(merged) =
- coerce_file_schema_to_string_type(&table_schema, &schema)
- {
- schema = Arc::new(merged);
- }
-
// read with view types
- if let Some(merged) =
coerce_file_schema_to_view_type(&table_schema, &schema)
+ if let Some(merged) =
apply_file_schema_type_coercions(&table_schema, &schema)
{
schema = Arc::new(merged);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]