alamb commented on code in PR #4251:
URL: https://github.com/apache/arrow-datafusion/pull/4251#discussion_r1026723379


##########
datafusion/core/src/config.rs:
##########
@@ -218,6 +221,11 @@ impl BuiltInConfigs {
                           configuration setting '{}'.", OPT_COALESCE_BATCHES),
                  4096,
             ),
+            ConfigDefinition::new_bool(

Review Comment:
   👍  100% for an option that defaults to Off



##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
         })
         .collect()
 }
+
+/// Specialized copy of Schema::try_merge that supports merging fields that 
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(

Review Comment:
   Adding `coerce` in the name of this function I think might make it faster to 
understand what it does
   
   ```suggestion
   pub(crate) fn try_merge_and_coerce_schemas(
   ```



##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -99,6 +99,100 @@ async fn schema_merge_ignores_metadata_by_default() {
     assert_no_metadata(&actual);
 }
 
+#[tokio::test]
+async fn merge_compatible_schemas() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+
+    let schema1 = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int8, false),
+            Field::new("b", DataType::Int16, false),
+            Field::new("c", DataType::Int32, false),
+        ],
+        HashMap::new(),
+    );
+
+    let schema2 = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int16, false),
+        ],
+        HashMap::new(),
+    );
+
+    create_parquet_file(tmp_dir.path(), "t1.parquet", &schema1).await?;
+    create_parquet_file(tmp_dir.path(), "t2.parquet", &schema2).await?;
+
+    let ctx = SessionContext::default();
+
+    // with type coercion disabled
+    let err = ctx
+        .register_parquet(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            ParquetReadOptions::default().with_coerce_types(false),
+        )
+        .await
+        .expect_err("should fail");
+    assert!(err
+        .to_string()
+        .contains("Schema merge failed due to different, but compatible, data 
types"));
+
+    // with type coercion enabled
+    ctx.register_parquet(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        ParquetReadOptions::default().with_coerce_types(true),
+    )
+    .await?;
+
+    let df = ctx.table("test")?;
+
+    let expected_schema = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ],
+        HashMap::new(),
+    );
+
+    let actual_schema: Schema = df.schema().into();
+    assert_eq!(expected_schema, actual_schema);
+
+    let batches = df.collect().await?;
+    for batch in batches {
+        assert_eq!(&expected_schema, batch.schema().as_ref());
+        assert!(batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .is_some());
+        assert!(batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .is_some());
+        assert!(batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .is_some());
+    }
+
+    Ok(())
+}
+
+async fn create_parquet_file(path: &Path, filename: &str, schema: &Schema) -> 
Result<()> {
+    let ctx = SessionContext::default();
+    let options = CsvReadOptions::new().schema(schema);
+    ctx.register_csv("t", "tests/example.csv", options).await?;
+    let t = ctx.table("t")?;
+    let path = path.join(filename);
+    t.write_parquet(path.to_str().unwrap(), None).await

Review Comment:
   this is very clever



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1204,7 +1204,7 @@ mod tests {
         let schema = Schema::new(vec![
             Field::new("c1", DataType::Utf8, true),
             Field::new("c2", DataType::Int64, true),
-            Field::new("c3", DataType::Int8, true),
+            Field::new("c3", DataType::Date32, true),

Review Comment:
   ```suggestion
               // use different type to verify type coercion
               Field::new("c3", DataType::Date32, true),
   ```



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -736,7 +754,7 @@ mod tests {
         ]));
 
         let file_schema_3 =
-            Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, 
true)]));
+            Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32, 
true)]));

Review Comment:
   ```suggestion
               // use different type to verify type coercion
               Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32, 
true)]));
   ```



##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -99,6 +99,100 @@ async fn schema_merge_ignores_metadata_by_default() {
     assert_no_metadata(&actual);
 }
 
+#[tokio::test]
+async fn merge_compatible_schemas() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+
+    let schema1 = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int8, false),
+            Field::new("b", DataType::Int16, false),
+            Field::new("c", DataType::Int32, false),
+        ],
+        HashMap::new(),
+    );
+
+    let schema2 = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int16, false),
+        ],
+        HashMap::new(),
+    );
+
+    create_parquet_file(tmp_dir.path(), "t1.parquet", &schema1).await?;
+    create_parquet_file(tmp_dir.path(), "t2.parquet", &schema2).await?;
+
+    let ctx = SessionContext::default();
+
+    // with type coercion disabled
+    let err = ctx
+        .register_parquet(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            ParquetReadOptions::default().with_coerce_types(false),
+        )
+        .await
+        .expect_err("should fail");
+    assert!(err
+        .to_string()
+        .contains("Schema merge failed due to different, but compatible, data 
types"));
+
+    // with type coercion enabled
+    ctx.register_parquet(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        ParquetReadOptions::default().with_coerce_types(true),
+    )
+    .await?;
+
+    let df = ctx.table("test")?;
+
+    let expected_schema = Schema::new_with_metadata(
+        vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ],
+        HashMap::new(),
+    );
+
+    let actual_schema: Schema = df.schema().into();
+    assert_eq!(expected_schema, actual_schema);
+
+    let batches = df.collect().await?;

Review Comment:
   there is also the `arrow_typeof` function added by @waitingkuo , so you 
could verify the expected schema with a query like
   
   ```sql
   select arrow_typeof(a), arrow_typeof(b), arrow_typeof(c) from test;
   ```



##########
datafusion/core/src/datasource/file_format/avro.rs:
##########
@@ -37,7 +37,18 @@ use crate::physical_plan::Statistics;
 pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
 /// Avro `FileFormat` implementation.
 #[derive(Default, Debug)]
-pub struct AvroFormat;
+pub struct AvroFormat {
+    /// Specify whether compatible types can be coerced when merging schemas
+    coerce_types: bool,
+}
+
+impl AvroFormat {
+    /// Specify whether compatible types can be coerced when merging schemas
+    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {

Review Comment:
   I suggest we consider  adding `coerce_types` as a new parameter in the 
`infer_schema` call rather than add it as a field on each format.
   
   While that would technically be a breaking API change I don't think this low 
level API is used all that often and the API upgrade would be straightforward. 
I think such a change would require less plumbing code and would make the 
existence of the schema merge feature clearer and allow a single place to 
document it 



##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
         })
         .collect()
 }
+
+/// Specialized copy of Schema::try_merge that supports merging fields that 
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(
+    schemas: impl IntoIterator<Item = Schema>,
+    merge_compatible_types: bool,
+) -> Result<Schema> {
+    let mut metadata = HashMap::new();
+    let mut fields: Vec<Field> = vec![];
+    for schema in schemas {
+        for (key, value) in &schema.metadata {
+            if let Some(old_val) = metadata.get(key) {
+                if old_val != value {
+                    return 
Err(DataFusionError::ArrowError(ArrowError::SchemaError(
+                        format!(
+                            "Fail to merge schema due to conflicting metadata. 
\
+                                     Key '{}' has different values '{}' and 
'{}'",
+                            key, old_val, value
+                        ),
+                    )));
+                }
+            }
+            metadata.insert(key.to_owned(), value.to_owned());
+        }
+        for field in &schema.fields {
+            if let Some((i, merge_field)) =
+                fields.iter().find_position(|f| f.name() == field.name())
+            {
+                if merge_field.data_type() != field.data_type() {
+                    if let Some(new_type) =
+                        get_wider_type(merge_field.data_type(), 
field.data_type())
+                    {
+                        if &new_type != merge_field.data_type() {
+                            if merge_compatible_types {
+                                fields[i] = 
merge_field.clone().with_data_type(new_type);
+                            } else {
+                                return Err(DataFusionError::Execution(format!(
+                                    "Schema merge failed due to different, but 
compatible, data types for field '{}' ({} vs {}). \
+                                    Set '{}=true' (or call 
with_coerce_types(true) on reader options) \
+                                    to enable merging this field",
+                                    field.name(),
+                                    merge_field.data_type(),
+                                    field.data_type(),
+                                    OPT_FILE_FORMAT_COERCE_TYPES
+                                )));
+                            }
+                        }
+                    } else {
+                        return 
Err(DataFusionError::SchemaError(SchemaMergeError {
+                            field_name: field.name().to_owned(),
+                            data_types: (
+                                field.data_type().clone(),
+                                merge_field.data_type().clone(),
+                            ),
+                        }));
+                    }
+                }
+            } else {
+                // first time seeing this field
+                fields.push(field.clone());
+            }
+        }
+    }
+    Ok(Schema::new_with_metadata(fields, metadata))
+}
+
+pub(crate) fn get_wider_type(t1: &DataType, t2: &DataType) -> Option<DataType> 
{

Review Comment:
   There are also some popular coercions (like UTF8 -> Int*) that are not 
covered here



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -736,7 +754,7 @@ mod tests {
         ]));
 
         let file_schema_3 =
-            Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, 
true)]));
+            Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32, 
true)]));

Review Comment:
   Otherwise I might "fix" that kind of thing in a cleanup PR 😆 



##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
         })
         .collect()
 }
+
+/// Specialized copy of Schema::try_merge that supports merging fields that 
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(
+    schemas: impl IntoIterator<Item = Schema>,
+    merge_compatible_types: bool,
+) -> Result<Schema> {
+    let mut metadata = HashMap::new();
+    let mut fields: Vec<Field> = vec![];
+    for schema in schemas {
+        for (key, value) in &schema.metadata {
+            if let Some(old_val) = metadata.get(key) {
+                if old_val != value {
+                    return 
Err(DataFusionError::ArrowError(ArrowError::SchemaError(
+                        format!(
+                            "Fail to merge schema due to conflicting metadata. 
\
+                                     Key '{}' has different values '{}' and 
'{}'",
+                            key, old_val, value
+                        ),
+                    )));
+                }
+            }
+            metadata.insert(key.to_owned(), value.to_owned());
+        }
+        for field in &schema.fields {
+            if let Some((i, merge_field)) =
+                fields.iter().find_position(|f| f.name() == field.name())
+            {
+                if merge_field.data_type() != field.data_type() {
+                    if let Some(new_type) =
+                        get_wider_type(merge_field.data_type(), 
field.data_type())
+                    {
+                        if &new_type != merge_field.data_type() {
+                            if merge_compatible_types {
+                                fields[i] = 
merge_field.clone().with_data_type(new_type);
+                            } else {
+                                return Err(DataFusionError::Execution(format!(
+                                    "Schema merge failed due to different, but 
compatible, data types for field '{}' ({} vs {}). \
+                                    Set '{}=true' (or call 
with_coerce_types(true) on reader options) \
+                                    to enable merging this field",
+                                    field.name(),
+                                    merge_field.data_type(),
+                                    field.data_type(),
+                                    OPT_FILE_FORMAT_COERCE_TYPES
+                                )));
+                            }
+                        }
+                    } else {
+                        return 
Err(DataFusionError::SchemaError(SchemaMergeError {
+                            field_name: field.name().to_owned(),
+                            data_types: (
+                                field.data_type().clone(),
+                                merge_field.data_type().clone(),
+                            ),
+                        }));
+                    }
+                }
+            } else {
+                // first time seeing this field
+                fields.push(field.clone());
+            }
+        }
+    }
+    Ok(Schema::new_with_metadata(fields, metadata))
+}
+
+pub(crate) fn get_wider_type(t1: &DataType, t2: &DataType) -> Option<DataType> 
{

Review Comment:
   I would recommend using `comparison_coercion` here as I think it is a 
superset of these rules and will remain consistent with the rest of the 
codebase over time
   
   
https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion/binary.rs#L185`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to