nevi-me commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r700593118



##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]

Review comment:
       It should be possible to move this feature flag to 
`datafusion/src/lib.rs` so you don't include it for each module here

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), 
false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, 
Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, 
None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the 
other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, 
None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), 
field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {

Review comment:
       This is cool 👍 

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -17,6 +17,7 @@
 
 //! DataFusion data sources
 
+pub mod avro;

Review comment:
       Yes, then the feature flags in the `avro` module can be removed

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), 
false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, 
Some(&Default::default()))

Review comment:
       You could pass `None` as the props, and then only set the metadata if 
there are actual properties.
   
   We can adjust this when moving this module to arrow-rs though

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), 
false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, 
Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, 
None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the 
other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, 
None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), 
field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as 
i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,

Review comment:
       This should be a binary field, `DataType::FixedSizeLen` ideally because 
you otherwise lose the type information (that you're dealing with an UUID).
   
   Extension arrays will also be helpful in future for this. UUID is common 
enough across different data sources (SQL, Parquet) that we might want to 
preserve its properties.

##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,434 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as 
input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema

Review comment:
       nit: is it line-delmited? I'd expect that from text sources, not binary 
ones

##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> 
Result<Schema> {

Review comment:
       nit: is it inference, or reading the schema from the file/input?

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       If no decision is made here, we could defer the clippy changes until 
they land on stable

##########
File path: datafusion/src/physical_plan/datetime_expressions.rs
##########
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, 
x)).transpose();
 
     Ok(match array {
-        ColumnarValue::Scalar(scalar) => {

Review comment:
       is this change related to the avro addition?

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), 
false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, 
Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, 
None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the 
other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, 
None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), 
field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as 
i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => 
DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => 
DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());

Review comment:
       I haven't read through the Avro spec yet, so this might be a silly 
question.
   
   Is this embedding the avro schema in the Arrow schema? If so, 👍 as this 
would in future become useful if we pass a record batch to an Avro writer, that 
would use this schema to preserve some metadata about the originally read Avro 
source.
   
   If the above is true, a future enhancement could be using a well-known 
prefix, like `AVRO::doc`

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;

Review comment:
       This should also be feature-flagged so that the flags inside its file 
can be removed. There's no use compiling the avro module if the flag is 
disabled.

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), 
false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, 
Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, 
None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the 
other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, 
None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), 
field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as 
i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => 
DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => 
DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());
+        }
+        _ => {}
+    }
+    match &schema {
+        AvroSchema::Record {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Enum {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Fixed {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        } => {
+            let aliases: Vec<String> = aliases
+                .iter()
+                .map(|alias| aliased(alias, namespace.as_deref(), None))
+                .collect();
+            props.insert("aliases".to_string(), format!("[{}]", 
aliases.join(",")));
+        }
+        _ => {}
+    }
+    props
+}
+
+#[allow(dead_code)]
+fn get_metadata(

Review comment:
       This doesn't look like it's used anywhere. Is this intentional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to