alamb commented on code in PR #4251:
URL: https://github.com/apache/arrow-datafusion/pull/4251#discussion_r1026723379
##########
datafusion/core/src/config.rs:
##########
@@ -218,6 +221,11 @@ impl BuiltInConfigs {
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
+ ConfigDefinition::new_bool(
Review Comment:
👍 100% for an option that defaults to Off
##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
})
.collect()
}
+
+/// Specialized copy of Schema::try_merge that supports merging fields that
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(
Review Comment:
Adding `coerce` in the name of this function I think might make it faster to
understand what it does
```suggestion
pub(crate) fn try_merge_and_coerce_schemas(
```
##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -99,6 +99,100 @@ async fn schema_merge_ignores_metadata_by_default() {
assert_no_metadata(&actual);
}
+#[tokio::test]
+async fn merge_compatible_schemas() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+
+ let schema1 = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int8, false),
+ Field::new("b", DataType::Int16, false),
+ Field::new("c", DataType::Int32, false),
+ ],
+ HashMap::new(),
+ );
+
+ let schema2 = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int16, false),
+ ],
+ HashMap::new(),
+ );
+
+ create_parquet_file(tmp_dir.path(), "t1.parquet", &schema1).await?;
+ create_parquet_file(tmp_dir.path(), "t2.parquet", &schema2).await?;
+
+ let ctx = SessionContext::default();
+
+ // with type coercion disabled
+ let err = ctx
+ .register_parquet(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::default().with_coerce_types(false),
+ )
+ .await
+ .expect_err("should fail");
+ assert!(err
+ .to_string()
+ .contains("Schema merge failed due to different, but compatible, data
types"));
+
+ // with type coercion enabled
+ ctx.register_parquet(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::default().with_coerce_types(true),
+ )
+ .await?;
+
+ let df = ctx.table("test")?;
+
+ let expected_schema = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ],
+ HashMap::new(),
+ );
+
+ let actual_schema: Schema = df.schema().into();
+ assert_eq!(expected_schema, actual_schema);
+
+ let batches = df.collect().await?;
+ for batch in batches {
+ assert_eq!(&expected_schema, batch.schema().as_ref());
+ assert!(batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .is_some());
+ assert!(batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .is_some());
+ assert!(batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .is_some());
+ }
+
+ Ok(())
+}
+
+async fn create_parquet_file(path: &Path, filename: &str, schema: &Schema) ->
Result<()> {
+ let ctx = SessionContext::default();
+ let options = CsvReadOptions::new().schema(schema);
+ ctx.register_csv("t", "tests/example.csv", options).await?;
+ let t = ctx.table("t")?;
+ let path = path.join(filename);
+ t.write_parquet(path.to_str().unwrap(), None).await
Review Comment:
this is very clever
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1204,7 +1204,7 @@ mod tests {
let schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
- Field::new("c3", DataType::Int8, true),
+ Field::new("c3", DataType::Date32, true),
Review Comment:
```suggestion
// use different type to verify type coercion
Field::new("c3", DataType::Date32, true),
```
##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -736,7 +754,7 @@ mod tests {
]));
let file_schema_3 =
- Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32,
true)]));
+ Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32,
true)]));
Review Comment:
```suggestion
// use different type to verify type coercion
Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32,
true)]));
```
##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -99,6 +99,100 @@ async fn schema_merge_ignores_metadata_by_default() {
assert_no_metadata(&actual);
}
+#[tokio::test]
+async fn merge_compatible_schemas() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+
+ let schema1 = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int8, false),
+ Field::new("b", DataType::Int16, false),
+ Field::new("c", DataType::Int32, false),
+ ],
+ HashMap::new(),
+ );
+
+ let schema2 = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int16, false),
+ ],
+ HashMap::new(),
+ );
+
+ create_parquet_file(tmp_dir.path(), "t1.parquet", &schema1).await?;
+ create_parquet_file(tmp_dir.path(), "t2.parquet", &schema2).await?;
+
+ let ctx = SessionContext::default();
+
+ // with type coercion disabled
+ let err = ctx
+ .register_parquet(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::default().with_coerce_types(false),
+ )
+ .await
+ .expect_err("should fail");
+ assert!(err
+ .to_string()
+ .contains("Schema merge failed due to different, but compatible, data
types"));
+
+ // with type coercion enabled
+ ctx.register_parquet(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::default().with_coerce_types(true),
+ )
+ .await?;
+
+ let df = ctx.table("test")?;
+
+ let expected_schema = Schema::new_with_metadata(
+ vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ],
+ HashMap::new(),
+ );
+
+ let actual_schema: Schema = df.schema().into();
+ assert_eq!(expected_schema, actual_schema);
+
+ let batches = df.collect().await?;
Review Comment:
there is also the `arrow_typeof` function added by @waitingkuo , so you
could verify the expected schema with a query like
```sql
select arrow_typeof(a), arrow_typeof(b), arrow_typeof(c) from test;
```
##########
datafusion/core/src/datasource/file_format/avro.rs:
##########
@@ -37,7 +37,18 @@ use crate::physical_plan::Statistics;
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
/// Avro `FileFormat` implementation.
#[derive(Default, Debug)]
-pub struct AvroFormat;
+pub struct AvroFormat {
+ /// Specify whether compatible types can be coerced when merging schemas
+ coerce_types: bool,
+}
+
+impl AvroFormat {
+ /// Specify whether compatible types can be coerced when merging schemas
+ pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
Review Comment:
I suggest we consider adding `coerce_types` as a new parameter in the
`infer_schema` call rather than add it as a field on each format.
While that would technically be a breaking API change I don't think this low
level API is used all that often and the API upgrade would be straightforward.
I think such a change would require less plumbing code and would make the
existence of the schema merge feature clearer and allow a single place to
document it
##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
})
.collect()
}
+
+/// Specialized copy of Schema::try_merge that supports merging fields that
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(
+ schemas: impl IntoIterator<Item = Schema>,
+ merge_compatible_types: bool,
+) -> Result<Schema> {
+ let mut metadata = HashMap::new();
+ let mut fields: Vec<Field> = vec![];
+ for schema in schemas {
+ for (key, value) in &schema.metadata {
+ if let Some(old_val) = metadata.get(key) {
+ if old_val != value {
+ return
Err(DataFusionError::ArrowError(ArrowError::SchemaError(
+ format!(
+ "Fail to merge schema due to conflicting metadata.
\
+ Key '{}' has different values '{}' and
'{}'",
+ key, old_val, value
+ ),
+ )));
+ }
+ }
+ metadata.insert(key.to_owned(), value.to_owned());
+ }
+ for field in &schema.fields {
+ if let Some((i, merge_field)) =
+ fields.iter().find_position(|f| f.name() == field.name())
+ {
+ if merge_field.data_type() != field.data_type() {
+ if let Some(new_type) =
+ get_wider_type(merge_field.data_type(),
field.data_type())
+ {
+ if &new_type != merge_field.data_type() {
+ if merge_compatible_types {
+ fields[i] =
merge_field.clone().with_data_type(new_type);
+ } else {
+ return Err(DataFusionError::Execution(format!(
+ "Schema merge failed due to different, but
compatible, data types for field '{}' ({} vs {}). \
+ Set '{}=true' (or call
with_coerce_types(true) on reader options) \
+ to enable merging this field",
+ field.name(),
+ merge_field.data_type(),
+ field.data_type(),
+ OPT_FILE_FORMAT_COERCE_TYPES
+ )));
+ }
+ }
+ } else {
+ return
Err(DataFusionError::SchemaError(SchemaMergeError {
+ field_name: field.name().to_owned(),
+ data_types: (
+ field.data_type().clone(),
+ merge_field.data_type().clone(),
+ ),
+ }));
+ }
+ }
+ } else {
+ // first time seeing this field
+ fields.push(field.clone());
+ }
+ }
+ }
+ Ok(Schema::new_with_metadata(fields, metadata))
+}
+
+pub(crate) fn get_wider_type(t1: &DataType, t2: &DataType) -> Option<DataType>
{
Review Comment:
There are also some popular coercions (like UTF8 -> Int*) that are not
covered here
##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -736,7 +754,7 @@ mod tests {
]));
let file_schema_3 =
- Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32,
true)]));
+ Arc::new(Schema::new(vec![Field::new("c3", DataType::Date32,
true)]));
Review Comment:
Otherwise I might "fix" that kind of thing in a cleanup PR 😆
##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -175,3 +182,87 @@ fn get_col_stats(
})
.collect()
}
+
+/// Specialized copy of Schema::try_merge that supports merging fields that
have different,
+/// but compatible, data types
+pub(crate) fn try_merge_schemas(
+ schemas: impl IntoIterator<Item = Schema>,
+ merge_compatible_types: bool,
+) -> Result<Schema> {
+ let mut metadata = HashMap::new();
+ let mut fields: Vec<Field> = vec![];
+ for schema in schemas {
+ for (key, value) in &schema.metadata {
+ if let Some(old_val) = metadata.get(key) {
+ if old_val != value {
+ return
Err(DataFusionError::ArrowError(ArrowError::SchemaError(
+ format!(
+ "Fail to merge schema due to conflicting metadata.
\
+ Key '{}' has different values '{}' and
'{}'",
+ key, old_val, value
+ ),
+ )));
+ }
+ }
+ metadata.insert(key.to_owned(), value.to_owned());
+ }
+ for field in &schema.fields {
+ if let Some((i, merge_field)) =
+ fields.iter().find_position(|f| f.name() == field.name())
+ {
+ if merge_field.data_type() != field.data_type() {
+ if let Some(new_type) =
+ get_wider_type(merge_field.data_type(),
field.data_type())
+ {
+ if &new_type != merge_field.data_type() {
+ if merge_compatible_types {
+ fields[i] =
merge_field.clone().with_data_type(new_type);
+ } else {
+ return Err(DataFusionError::Execution(format!(
+ "Schema merge failed due to different, but
compatible, data types for field '{}' ({} vs {}). \
+ Set '{}=true' (or call
with_coerce_types(true) on reader options) \
+ to enable merging this field",
+ field.name(),
+ merge_field.data_type(),
+ field.data_type(),
+ OPT_FILE_FORMAT_COERCE_TYPES
+ )));
+ }
+ }
+ } else {
+ return
Err(DataFusionError::SchemaError(SchemaMergeError {
+ field_name: field.name().to_owned(),
+ data_types: (
+ field.data_type().clone(),
+ merge_field.data_type().clone(),
+ ),
+ }));
+ }
+ }
+ } else {
+ // first time seeing this field
+ fields.push(field.clone());
+ }
+ }
+ }
+ Ok(Schema::new_with_metadata(fields, metadata))
+}
+
+pub(crate) fn get_wider_type(t1: &DataType, t2: &DataType) -> Option<DataType>
{
Review Comment:
I would recommend using `comparison_coercion` here as I think it is a
superset of these rules and will remain consistent with the rest of the
codebase over time
https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion/binary.rs#L185`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]