This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5361d2ee09 fix: avro_to_arrow: Handle avro nested nullable struct 
(union) (#7663)
5361d2ee09 is described below

commit 5361d2ee090156eed4a1d91f6530695588a9405a
Author: Samrose <[email protected]>
AuthorDate: Wed Oct 4 09:59:09 2023 -0700

    fix: avro_to_arrow: Handle avro nested nullable struct (union) (#7663)
    
    Corrects handling of a nullable struct union.
    
    Signed-off-by: 🐼 Samrose Ahmed 🐼 <[email protected]>
---
 datafusion/core/Cargo.toml                         |   1 +
 .../datasource/avro_to_arrow/arrow_array_reader.rs | 669 +++++++++++++++++++--
 .../core/src/datasource/avro_to_arrow/schema.rs    |  60 +-
 datafusion/sqllogictest/test_files/avro.slt        |   8 +-
 4 files changed, 670 insertions(+), 68 deletions(-)

diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index d84d6a13c3..1db5d55baf 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -109,6 +109,7 @@ rand_distr = "0.4.3"
 regex = "1.5.4"
 rstest = "0.18.0"
 rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
+serde_json = "1"
 test-utils = { path = "../../test-utils" }
 thiserror = "1.0.37"
 tokio-postgres = "0.7.7"
diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs 
b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
index f983e26d48..fd91ea1cc5 100644
--- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
@@ -82,7 +82,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 fields, mut lookup, ..
             }) => {
                 for field in fields {
-                    Self::child_schema_lookup(&field.schema, &mut lookup)?;
+                    Self::child_schema_lookup(&field.name, &field.schema, &mut 
lookup)?;
                 }
                 Ok(lookup)
             }
@@ -93,27 +93,51 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     }
 
     fn child_schema_lookup<'b>(
+        parent_field_name: &str,
         schema: &AvroSchema,
         schema_lookup: &'b mut BTreeMap<String, usize>,
     ) -> Result<&'b BTreeMap<String, usize>> {
         match schema {
-            AvroSchema::Record(RecordSchema {
-                name,
-                fields,
-                lookup,
-                ..
-            }) => {
+            AvroSchema::Union(us) => {
+                let has_nullable = us
+                    .find_schema_with_known_schemata::<apache_avro::Schema>(
+                        &Value::Null,
+                        None,
+                        &None,
+                    )
+                    .is_some();
+                let sub_schemas = us.variants();
+                if has_nullable && sub_schemas.len() == 2 {
+                    if let Some(sub_schema) =
+                        sub_schemas.iter().find(|&s| !matches!(s, 
AvroSchema::Null))
+                    {
+                        Self::child_schema_lookup(
+                            parent_field_name,
+                            sub_schema,
+                            schema_lookup,
+                        )?;
+                    }
+                }
+            }
+            AvroSchema::Record(RecordSchema { fields, lookup, .. }) => {
                 lookup.iter().for_each(|(field_name, pos)| {
                     schema_lookup
-                        .insert(format!("{}.{}", name.fullname(None), 
field_name), *pos);
+                        .insert(format!("{}.{}", parent_field_name, 
field_name), *pos);
                 });
 
                 for field in fields {
-                    Self::child_schema_lookup(&field.schema, schema_lookup)?;
+                    let sub_parent_field_name =
+                        format!("{}.{}", parent_field_name, field.name);
+                    Self::child_schema_lookup(
+                        &sub_parent_field_name,
+                        &field.schema,
+                        schema_lookup,
+                    )?;
                 }
             }
             AvroSchema::Array(schema) => {
-                Self::child_schema_lookup(schema, schema_lookup)?;
+                let sub_parent_field_name = format!("{}.element", 
parent_field_name);
+                Self::child_schema_lookup(&sub_parent_field_name, schema, 
schema_lookup)?;
             }
             _ => (),
         }
@@ -147,7 +171,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
 
         let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
         let projection = self.projection.clone().unwrap_or_default();
-        let arrays = self.build_struct_array(&rows, self.schema.fields(), 
&projection);
+        let arrays =
+            self.build_struct_array(&rows, "", self.schema.fields(), 
&projection);
         let projected_fields = if projection.is_empty() {
             self.schema.fields().clone()
         } else {
@@ -305,6 +330,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
 
         for row in rows {
             if let Some(value) = self.field_lookup(col_name, row) {
+                let value = maybe_resolve_union(value);
                 // value can be an array or a scalar
                 let vals: Vec<Option<String>> = if let Value::String(v) = 
value {
                     vec![Some(v.to_string())]
@@ -444,6 +470,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     /// Build a nested GenericListArray from a list of unnested `Value`s
     fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
         &self,
+        parent_field_name: &str,
         rows: &[&Value],
         list_field: &Field,
     ) -> ArrowResult<ArrayRef> {
@@ -530,13 +557,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 .collect::<LargeStringArray>()
                 .into_data(),
             DataType::List(field) => {
-                let child =
-                    self.build_nested_list_array::<i32>(&flatten_values(rows), 
field)?;
+                let child = self.build_nested_list_array::<i32>(
+                    parent_field_name,
+                    &flatten_values(rows),
+                    field,
+                )?;
                 child.to_data()
             }
             DataType::LargeList(field) => {
-                let child =
-                    self.build_nested_list_array::<i64>(&flatten_values(rows), 
field)?;
+                let child = self.build_nested_list_array::<i64>(
+                    parent_field_name,
+                    &flatten_values(rows),
+                    field,
+                )?;
                 child.to_data()
             }
             DataType::Struct(fields) => {
@@ -554,16 +587,22 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 let null_struct_array = vec![("null".to_string(), 
Value::Null)];
                 let rows: Vec<&Vec<(String, Value)>> = rows
                     .iter()
+                    .map(|v| maybe_resolve_union(v))
                     .flat_map(|row| {
                         if let Value::Array(values) = row {
-                            values.iter().for_each(|_| {
-                                bit_util::set_bit(&mut null_buffer, 
struct_index);
-                                struct_index += 1;
-                            });
                             values
                                 .iter()
+                                .map(maybe_resolve_union)
                                 .map(|v| match v {
-                                    Value::Record(record) => record,
+                                    Value::Record(record) => {
+                                        bit_util::set_bit(&mut null_buffer, 
struct_index);
+                                        struct_index += 1;
+                                        record
+                                    }
+                                    Value::Null => {
+                                        struct_index += 1;
+                                        &null_struct_array
+                                    }
                                     other => panic!("expected Record, got 
{other:?}"),
                                 })
                                 .collect::<Vec<&Vec<(String, Value)>>>()
@@ -573,7 +612,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                         }
                     })
                     .collect();
-                let arrays = self.build_struct_array(&rows, fields, &[])?;
+
+                let sub_parent_field_name =
+                    format!("{}.{}", parent_field_name, list_field.name());
+                let arrays =
+                    self.build_struct_array(&rows, &sub_parent_field_name, 
fields, &[])?;
                 let data_type = DataType::Struct(fields.clone());
                 ArrayDataBuilder::new(data_type)
                     .len(rows.len())
@@ -610,6 +653,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     fn build_struct_array(
         &self,
         rows: RecordSlice,
+        parent_field_name: &str,
         struct_fields: &Fields,
         projection: &[String],
     ) -> ArrowResult<Vec<ArrayRef>> {
@@ -617,78 +661,83 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
             .iter()
             .filter(|field| projection.is_empty() || 
projection.contains(field.name()))
             .map(|field| {
+                let field_path = if parent_field_name.is_empty() {
+                    field.name().to_string()
+                } else {
+                    format!("{}.{}", parent_field_name, field.name())
+                };
                 let arr = match field.data_type() {
                     DataType::Null => Arc::new(NullArray::new(rows.len())) as 
ArrayRef,
-                    DataType::Boolean => self.build_boolean_array(rows, 
field.name()),
+                    DataType::Boolean => self.build_boolean_array(rows, 
&field_path),
                     DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, 
field.name())
+                        self.build_primitive_array::<Float64Type>(rows, 
&field_path)
                     }
                     DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, 
field.name())
+                        self.build_primitive_array::<Float32Type>(rows, 
&field_path)
                     }
                     DataType::Int64 => {
-                        self.build_primitive_array::<Int64Type>(rows, 
field.name())
+                        self.build_primitive_array::<Int64Type>(rows, 
&field_path)
                     }
                     DataType::Int32 => {
-                        self.build_primitive_array::<Int32Type>(rows, 
field.name())
+                        self.build_primitive_array::<Int32Type>(rows, 
&field_path)
                     }
                     DataType::Int16 => {
-                        self.build_primitive_array::<Int16Type>(rows, 
field.name())
+                        self.build_primitive_array::<Int16Type>(rows, 
&field_path)
                     }
                     DataType::Int8 => {
-                        self.build_primitive_array::<Int8Type>(rows, 
field.name())
+                        self.build_primitive_array::<Int8Type>(rows, 
&field_path)
                     }
                     DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, 
field.name())
+                        self.build_primitive_array::<UInt64Type>(rows, 
&field_path)
                     }
                     DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, 
field.name())
+                        self.build_primitive_array::<UInt32Type>(rows, 
&field_path)
                     }
                     DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, 
field.name())
+                        self.build_primitive_array::<UInt16Type>(rows, 
&field_path)
                     }
                     DataType::UInt8 => {
-                        self.build_primitive_array::<UInt8Type>(rows, 
field.name())
+                        self.build_primitive_array::<UInt8Type>(rows, 
&field_path)
                     }
                     // TODO: this is incomplete
                     DataType::Timestamp(unit, _) => match unit {
                         TimeUnit::Second => self
                             .build_primitive_array::<TimestampSecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         TimeUnit::Microsecond => self
                             .build_primitive_array::<TimestampMicrosecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         TimeUnit::Millisecond => self
                             .build_primitive_array::<TimestampMillisecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         TimeUnit::Nanosecond => self
                             .build_primitive_array::<TimestampNanosecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                     },
                     DataType::Date64 => {
-                        self.build_primitive_array::<Date64Type>(rows, 
field.name())
+                        self.build_primitive_array::<Date64Type>(rows, 
&field_path)
                     }
                     DataType::Date32 => {
-                        self.build_primitive_array::<Date32Type>(rows, 
field.name())
+                        self.build_primitive_array::<Date32Type>(rows, 
&field_path)
                     }
                     DataType::Time64(unit) => match unit {
                         TimeUnit::Microsecond => self
                             .build_primitive_array::<Time64MicrosecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         TimeUnit::Nanosecond => self
                             .build_primitive_array::<Time64NanosecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         t => {
                             return Err(ArrowError::SchemaError(format!(
@@ -698,14 +747,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     },
                     DataType::Time32(unit) => match unit {
                         TimeUnit::Second => self
-                            .build_primitive_array::<Time32SecondType>(
-                                rows,
-                                field.name(),
-                            ),
+                            .build_primitive_array::<Time32SecondType>(rows, 
&field_path),
                         TimeUnit::Millisecond => self
                             .build_primitive_array::<Time32MillisecondType>(
                                 rows,
-                                field.name(),
+                                &field_path,
                             ),
                         t => {
                             return Err(ArrowError::SchemaError(format!(
@@ -716,7 +762,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
                         rows.iter()
                             .map(|row| {
-                                let maybe_value = 
self.field_lookup(field.name(), row);
+                                let maybe_value = 
self.field_lookup(&field_path, row);
                                 match maybe_value {
                                     None => Ok(None),
                                     Some(v) => resolve_string(v),
@@ -728,7 +774,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     DataType::Binary | DataType::LargeBinary => Arc::new(
                         rows.iter()
                             .map(|row| {
-                                let maybe_value = 
self.field_lookup(field.name(), row);
+                                let maybe_value = 
self.field_lookup(&field_path, row);
                                 maybe_value.and_then(resolve_bytes)
                             })
                             .collect::<BinaryArray>(),
@@ -737,7 +783,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     DataType::FixedSizeBinary(ref size) => {
                         
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
                             rows.iter().map(|row| {
-                                let maybe_value = 
self.field_lookup(field.name(), row);
+                                let maybe_value = 
self.field_lookup(&field_path, row);
                                 maybe_value.and_then(|v| resolve_fixed(v, 
*size as usize))
                             }),
                             *size,
@@ -746,18 +792,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     DataType::List(ref list_field) => {
                         match list_field.data_type() {
                             DataType::Dictionary(ref key_ty, _) => {
-                                self.build_wrapped_list_array(rows, 
field.name(), key_ty)?
+                                self.build_wrapped_list_array(rows, 
&field_path, key_ty)?
                             }
                             _ => {
                                 // extract rows by name
                                 let extracted_rows = rows
                                     .iter()
                                     .map(|row| {
-                                        self.field_lookup(field.name(), row)
+                                        self.field_lookup(&field_path, row)
                                             .unwrap_or(&Value::Null)
                                     })
                                     .collect::<Vec<&Value>>();
                                 self.build_nested_list_array::<i32>(
+                                    &field_path,
                                     &extracted_rows,
                                     list_field,
                                 )?
@@ -767,7 +814,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     DataType::Dictionary(ref key_ty, ref val_ty) => self
                         .build_string_dictionary_array(
                             rows,
-                            field.name(),
+                            &field_path,
                             key_ty,
                             val_ty,
                         )?,
@@ -775,21 +822,31 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                         let len = rows.len();
                         let num_bytes = bit_util::ceil(len, 8);
                         let mut null_buffer = 
MutableBuffer::from_len_zeroed(num_bytes);
+                        let empty_vec = vec![];
                         let struct_rows = rows
                             .iter()
                             .enumerate()
-                            .map(|(i, row)| (i, 
self.field_lookup(field.name(), row)))
+                            .map(|(i, row)| (i, self.field_lookup(&field_path, 
row)))
                             .map(|(i, v)| {
-                                if let Some(Value::Record(value)) = v {
-                                    bit_util::set_bit(&mut null_buffer, i);
-                                    value
-                                } else {
-                                    panic!("expected struct got {v:?}");
+                                let v = v.map(maybe_resolve_union);
+                                match v {
+                                    Some(Value::Record(value)) => {
+                                        bit_util::set_bit(&mut null_buffer, i);
+                                        value
+                                    }
+                                    None | Some(Value::Null) => &empty_vec,
+                                    other => {
+                                        panic!("expected struct got 
{other:?}");
+                                    }
                                 }
                             })
                             .collect::<Vec<&Vec<(String, Value)>>>();
-                        let arrays =
-                            self.build_struct_array(&struct_rows, fields, 
&[])?;
+                        let arrays = self.build_struct_array(
+                            &struct_rows,
+                            &field_path,
+                            fields,
+                            &[],
+                        )?;
                         // construct a struct array's data in order to set 
null buffer
                         let data_type = DataType::Struct(fields.clone());
                         let data = ArrayDataBuilder::new(data_type)
@@ -1019,6 +1076,7 @@ mod test {
     use crate::arrow::datatypes::{Field, TimeUnit};
     use crate::datasource::avro_to_arrow::{Reader, ReaderBuilder};
     use arrow::datatypes::DataType;
+    use datafusion_common::assert_batches_eq;
     use datafusion_common::cast::{
         as_int32_array, as_int64_array, as_list_array, 
as_timestamp_microsecond_array,
     };
@@ -1079,7 +1137,7 @@ mod test {
         let a_array = as_list_array(batch.column(col_id_index)).unwrap();
         assert_eq!(
             *a_array.data_type(),
-            DataType::List(Arc::new(Field::new("bigint", DataType::Int64, 
true)))
+            DataType::List(Arc::new(Field::new("element", DataType::Int64, 
true)))
         );
         let array = a_array.value(0);
         assert_eq!(*array.data_type(), DataType::Int64);
@@ -1101,6 +1159,497 @@ mod test {
         assert_eq!(batch.num_rows(), 3);
     }
 
+    #[test]
+    fn test_complex_list() {
+        let schema = apache_avro::Schema::parse_str(
+            r#"
+            {
+              "type": "record",
+              "name": "r1",
+              "fields": [
+                {
+                  "name": "headers",
+                  "type": ["null", {
+                        "type": "array",
+                        "items": ["null",{
+                            "name":"r2",
+                            "type": "record",
+                            "fields":[
+                                {"name":"name", "type": ["null", "string"], 
"default": null},
+                                {"name":"value", "type": ["null", "string"], 
"default": null}
+                            ]
+                        }]
+                    }],
+                    "default": null
+                }
+              ]
+            }"#,
+        )
+        .unwrap();
+        let r1 = apache_avro::to_value(serde_json::json!({
+            "headers": [
+                {
+                    "name": "a",
+                    "value": "b"
+                }
+            ]
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+
+        let mut w = apache_avro::Writer::new(&schema, vec![]);
+        w.append(r1).unwrap();
+        let bytes = w.into_inner().unwrap();
+
+        let mut reader = ReaderBuilder::new()
+            .read_schema()
+            .with_batch_size(2)
+            .build(std::io::Cursor::new(bytes))
+            .unwrap();
+
+        let batch = reader.next().unwrap().unwrap();
+        assert_eq!(batch.num_rows(), 1);
+        assert_eq!(batch.num_columns(), 1);
+        let expected = [
+            "+-----------------------+",
+            "| headers               |",
+            "+-----------------------+",
+            "| [{name: a, value: b}] |",
+            "+-----------------------+",
+        ];
+        assert_batches_eq!(expected, &[batch]);
+    }
+
+    #[test]
+    fn test_complex_struct() {
+        let schema = apache_avro::Schema::parse_str(
+            r#"
+        {
+          "type": "record",
+          "name": "r1",
+          "fields": [
+            {
+              "name": "dns",
+              "type": [
+                "null",
+                {
+                  "type": "record",
+                  "name": "r13",
+                  "fields": [
+                    {
+                      "name": "answers",
+                      "type": [
+                        "null",
+                        {
+                          "type": "array",
+                          "items": [
+                            "null",
+                            {
+                              "type": "record",
+                              "name": "r292",
+                              "fields": [
+                                {
+                                  "name": "class",
+                                  "type": ["null", "string"],
+                                  "default": null
+                                },
+                                {
+                                  "name": "data",
+                                  "type": ["null", "string"],
+                                  "default": null
+                                },
+                                {
+                                  "name": "name",
+                                  "type": ["null", "string"],
+                                  "default": null
+                                },
+                                {
+                                  "name": "ttl",
+                                  "type": ["null", "long"],
+                                  "default": null
+                                },
+                                {
+                                  "name": "type",
+                                  "type": ["null", "string"],
+                                  "default": null
+                                }
+                              ]
+                            }
+                          ]
+                        }
+                      ],
+                      "default": null
+                    },
+                    {
+                      "name": "header_flags",
+                      "type": [
+                        "null",
+                        {
+                          "type": "array",
+                          "items": ["null", "string"]
+                        }
+                      ],
+                      "default": null
+                    },
+                    {
+                      "name": "id",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "op_code",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "question",
+                      "type": [
+                        "null",
+                        {
+                          "type": "record",
+                          "name": "r288",
+                          "fields": [
+                            {
+                              "name": "class",
+                              "type": ["null", "string"],
+                              "default": null
+                            },
+                            {
+                              "name": "name",
+                              "type": ["null", "string"],
+                              "default": null
+                            },
+                            {
+                              "name": "registered_domain",
+                              "type": ["null", "string"],
+                              "default": null
+                            },
+                            {
+                              "name": "subdomain",
+                              "type": ["null", "string"],
+                              "default": null
+                            },
+                            {
+                              "name": "top_level_domain",
+                              "type": ["null", "string"],
+                              "default": null
+                            },
+                            {
+                              "name": "type",
+                              "type": ["null", "string"],
+                              "default": null
+                            }
+                          ]
+                        }
+                      ],
+                      "default": null
+                    },
+                    {
+                      "name": "resolved_ip",
+                      "type": [
+                        "null",
+                        {
+                          "type": "array",
+                          "items": ["null", "string"]
+                        }
+                      ],
+                      "default": null
+                    },
+                    {
+                      "name": "response_code",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "type",
+                      "type": ["null", "string"],
+                      "default": null
+                    }
+                  ]
+                }
+              ],
+              "default": null
+            }
+          ]
+        }"#,
+        )
+        .unwrap();
+
+        let jv1 = serde_json::json!({
+          "dns": {
+            "answers": [
+                {
+                    "data": "CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=",
+                    "type": "1"
+                },
+                {
+                    "data": "CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=",
+                    "type": "1"
+                },
+                {
+                    "data": "CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=",
+                    "type": "1"
+                }
+            ],
+            "question": {
+                "name": "security.ubuntu.com",
+                "type": "A"
+            },
+            "resolved_ip": [
+                "67.43.156.1",
+                "67.43.156.2",
+                "67.43.156.3"
+            ],
+            "response_code": "0"
+          }
+        });
+        let r1 = apache_avro::to_value(jv1)
+            .unwrap()
+            .resolve(&schema)
+            .unwrap();
+
+        let mut w = apache_avro::Writer::new(&schema, vec![]);
+        w.append(r1).unwrap();
+        let bytes = w.into_inner().unwrap();
+
+        let mut reader = ReaderBuilder::new()
+            .read_schema()
+            .with_batch_size(1)
+            .build(std::io::Cursor::new(bytes))
+            .unwrap();
+
+        let batch = reader.next().unwrap().unwrap();
+        assert_eq!(batch.num_rows(), 1);
+        assert_eq!(batch.num_columns(), 1);
+
+        let expected = [
+            
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+            "| dns                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+            
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+            "| {answers: [{class: , data: 
CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=, name: , ttl: , type: 1}, 
{class: , data: CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=, name: , ttl: 
, type: 1}, {class: , data: CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=, 
name: , ttl: , type: 1}], header_flags: , id: , op_code: , question: {class: , 
name: security.ubuntu.com, registered_domain: , subdomain: , top_level_domain: 
, type: A}, resolved_ip: [67.43.156.1, 67.43.156.2, 67.43.15 [...]
+            
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+        ];
+        assert_batches_eq!(expected, &[batch]);
+    }
+
+    #[test]
+    fn test_deep_nullable_struct() {
+        let schema = apache_avro::Schema::parse_str(
+            r#"
+            {
+                "type": "record",
+                "name": "r1",
+                "fields": [
+                  {
+                    "name": "col1",
+                    "type": [
+                      "null",
+                      {
+                        "type": "record",
+                        "name": "r2",
+                        "fields": [
+                          {
+                            "name": "col2",
+                            "type": [
+                              "null",
+                              {
+                                "type": "record",
+                                "name": "r3",
+                                "fields": [
+                                  {
+                                    "name": "col3",
+                                    "type": [
+                                      "null",
+                                      {
+                                        "type": "record",
+                                        "name": "r4",
+                                        "fields": [
+                                          {
+                                            "name": "col4",
+                                            "type": [
+                                              "null",
+                                              {
+                                                "type": "record",
+                                                "name": "r5",
+                                                "fields": [
+                                                  {
+                                                    "name": "col5",
+                                                    "type": ["null", "string"]
+                                                  }
+                                                ]
+                                              }
+                                            ]
+                                          }
+                                        ]
+                                      }
+                                    ]
+                                  }
+                                ]
+                              }
+                            ]
+                          }
+                        ]
+                      }
+                    ]
+                  }
+                ]
+              }
+            "#,
+        )
+        .unwrap();
+        let r1 = apache_avro::to_value(serde_json::json!({
+            "col1": {
+                "col2": {
+                    "col3": {
+                        "col4": {
+                            "col5": "hello"
+                        }
+                    }
+                }
+            }
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+        let r2 = apache_avro::to_value(serde_json::json!({
+            "col1": {
+                "col2": {
+                    "col3": {
+                        "col4": {
+                            "col5": null
+                        }
+                    }
+                }
+            }
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+        let r3 = apache_avro::to_value(serde_json::json!({
+            "col1": {
+                "col2": {
+                    "col3": null
+                }
+            }
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+        let r4 = apache_avro::to_value(serde_json::json!({
+            "col1": null
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+
+        let mut w = apache_avro::Writer::new(&schema, vec![]);
+        w.append(r1).unwrap();
+        w.append(r2).unwrap();
+        w.append(r3).unwrap();
+        w.append(r4).unwrap();
+        let bytes = w.into_inner().unwrap();
+
+        let mut reader = ReaderBuilder::new()
+            .read_schema()
+            .with_batch_size(4)
+            .build(std::io::Cursor::new(bytes))
+            .unwrap();
+
+        let batch = reader.next().unwrap().unwrap();
+
+        let expected = [
+            "+---------------------------------------+",
+            "| col1                                  |",
+            "+---------------------------------------+",
+            "| {col2: {col3: {col4: {col5: hello}}}} |",
+            "| {col2: {col3: {col4: {col5: }}}}      |",
+            "| {col2: {col3: }}                      |",
+            "|                                       |",
+            "+---------------------------------------+",
+        ];
+        assert_batches_eq!(expected, &[batch]);
+    }
+
+    #[test]
+    fn test_avro_nullable_struct() {
+        let schema = apache_avro::Schema::parse_str(
+            r#"
+            {
+              "type": "record",
+              "name": "r1",
+              "fields": [
+                {
+                  "name": "col1",
+                  "type": [
+                    "null",
+                    {
+                      "type": "record",
+                      "name": "r2",
+                      "fields": [
+                        {
+                          "name": "col2",
+                          "type": ["null", "string"]
+                        }
+                      ]
+                    }
+                  ],
+                  "default": null
+                }
+              ]
+            }"#,
+        )
+        .unwrap();
+        let r1 = apache_avro::to_value(serde_json::json!({
+            "col1": null
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+        let r2 = apache_avro::to_value(serde_json::json!({
+            "col1": {
+                "col2": "hello"
+            }
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+        let r3 = apache_avro::to_value(serde_json::json!({
+            "col1": {
+                "col2": null
+            }
+        }))
+        .unwrap()
+        .resolve(&schema)
+        .unwrap();
+
+        let mut w = apache_avro::Writer::new(&schema, vec![]);
+        w.append(r1).unwrap();
+        w.append(r2).unwrap();
+        w.append(r3).unwrap();
+        let bytes = w.into_inner().unwrap();
+
+        let mut reader = ReaderBuilder::new()
+            .read_schema()
+            .with_batch_size(3)
+            .build(std::io::Cursor::new(bytes))
+            .unwrap();
+        let batch = reader.next().unwrap().unwrap();
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(batch.num_columns(), 1);
+
+        let expected = [
+            "+---------------+",
+            "| col1          |",
+            "+---------------+",
+            "|               |",
+            "| {col2: hello} |",
+            "| {col2: }      |",
+            "+---------------+",
+        ];
+        assert_batches_eq!(expected, &[batch]);
+    }
+
     #[test]
     fn test_avro_iterator() {
         let reader = build_reader("alltypes_plain.avro", 5);
diff --git a/datafusion/core/src/datasource/avro_to_arrow/schema.rs 
b/datafusion/core/src/datasource/avro_to_arrow/schema.rs
index f15e378cc6..761e6b6268 100644
--- a/datafusion/core/src/datasource/avro_to_arrow/schema.rs
+++ b/datafusion/core/src/datasource/avro_to_arrow/schema.rs
@@ -35,7 +35,7 @@ pub fn to_arrow_schema(avro_schema: &apache_avro::Schema) -> 
Result<Schema> {
                 schema_fields.push(schema_to_field_with_props(
                     &field.schema,
                     Some(&field.name),
-                    false,
+                    field.is_nullable(),
                     Some(external_props(&field.schema)),
                 )?)
             }
@@ -73,7 +73,7 @@ fn schema_to_field_with_props(
         AvroSchema::Bytes => DataType::Binary,
         AvroSchema::String => DataType::Utf8,
         AvroSchema::Array(item_schema) => DataType::List(Arc::new(
-            schema_to_field_with_props(item_schema, None, false, None)?,
+            schema_to_field_with_props(item_schema, Some("element"), false, 
None)?,
         )),
         AvroSchema::Map(value_schema) => {
             let value_field =
@@ -116,7 +116,7 @@ fn schema_to_field_with_props(
                 DataType::Union(UnionFields::new(type_ids, fields), 
UnionMode::Dense)
             }
         }
-        AvroSchema::Record(RecordSchema { name, fields, .. }) => {
+        AvroSchema::Record(RecordSchema { fields, .. }) => {
             let fields: Result<_> = fields
                 .iter()
                 .map(|field| {
@@ -129,7 +129,7 @@ fn schema_to_field_with_props(
                     }*/
                     schema_to_field_with_props(
                         &field.schema,
-                        Some(&format!("{}.{}", name.fullname(None), 
field.name)),
+                        Some(&field.name),
                         false,
                         Some(props),
                     )
@@ -442,6 +442,58 @@ mod test {
         assert_eq!(arrow_schema.unwrap(), expected);
     }
 
+    #[test]
+    fn test_nested_schema() {
+        let avro_schema = apache_avro::Schema::parse_str(
+            r#"
+            {
+              "type": "record",
+              "name": "r1",
+              "fields": [
+                {
+                  "name": "col1",
+                  "type": [
+                    "null",
+                    {
+                      "type": "record",
+                      "name": "r2",
+                      "fields": [
+                        {
+                          "name": "col2",
+                          "type": "string"
+                        },
+                        {
+                          "name": "col3",
+                          "type": ["null", "string"],
+                          "default": null
+                        }
+                      ]
+                    }
+                  ],
+                  "default": null
+                }
+              ]
+            }"#,
+        )
+        .unwrap();
+        // should not use Avro Record names.
+        let expected_arrow_schema = Schema::new(vec![Field::new(
+            "col1",
+            arrow::datatypes::DataType::Struct(
+                vec![
+                    Field::new("col2", Utf8, false),
+                    Field::new("col3", Utf8, true),
+                ]
+                .into(),
+            ),
+            true,
+        )]);
+        assert_eq!(
+            to_arrow_schema(&avro_schema).unwrap(),
+            expected_arrow_schema
+        );
+    }
+
     #[test]
     fn test_non_record_schema() {
         let arrow_schema = to_arrow_schema(&AvroSchema::String);
diff --git a/datafusion/sqllogictest/test_files/avro.slt 
b/datafusion/sqllogictest/test_files/avro.slt
index 3309cd1cf6..5cd268e8ef 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -225,11 +225,11 @@ SELECT id, CAST(string_col AS varchar) FROM 
alltypes_plain_multi_files
 1  1
 
 # test avro nested records
-query ??
-SELECT f1, f2 FROM nested_records
+query ????
+SELECT f1, f2, f3, f4 FROM nested_records
 ----
-{ns2.record2.f1_1: aaa, ns2.record2.f1_2: 10, ns2.record2.f1_3: 
{ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: true, ns4.record4.f2_2: 1.2}, 
{ns4.record4.f2_1: true, ns4.record4.f2_2: 2.2}]
-{ns2.record2.f1_1: bbb, ns2.record2.f1_2: 20, ns2.record2.f1_3: 
{ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: false, ns4.record4.f2_2: 10.2}]
+{f1_1: aaa, f1_2: 10, f1_3: {f1_3_1: 3.14}} [{f2_1: true, f2_2: 1.2}, {f2_1: 
true, f2_2: 2.2}] {f3_1: xyz} [{f4_1: 200}, ]
+{f1_1: bbb, f1_2: 20, f1_3: {f1_3_1: 3.14}} [{f2_1: false, f2_2: 10.2}] NULL 
[, {f4_1: 300}]
 
 # test avro enum
 query TTT


Reply via email to