This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 291e6e575c Add arrow-avro support for Impala Nullability (#7954)
291e6e575c is described below

commit 291e6e575c727a98ee52b617da0c8de64a821e09
Author: Veronica Manchola <[email protected]>
AuthorDate: Mon Jul 21 11:20:14 2025 -0400

    Add arrow-avro support for Impala Nullability (#7954)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    - Related to https://github.com/apache/arrow-rs/pull/6965
    
    # Rationale for this change
    
    This change introduces support for Avro files generated by systems like
    Impala, which have a specific convention for representing nullable
    fields. In Avro, nullability is typically represented by a union of a
    type and a type. This PR updates the Avro reader to correctly interpret
    these schemas, ensuring proper handling of nullable data and improving
    interoperability with Impala-generated data. `null`
    
    # What changes are included in this PR?
    
    This pull request introduces several changes to support Impala-style
    nullability in the Avro reader:
    - The Avro schema parser has been updated to recognize unions where is
    the second type (e.g., `['type', 'null']`) as a nullable field. `null`
    - Logic has been added to handle this nullability convention during Avro
    decoding.
    - New tests are included to verify that Avro files using this
    nullability format are read correctly while ensuring that strict mode
    properly identifies them.
    
    # Are these changes tested?
    
    Yes, I added new test cases covering these changes to the tests named:
    `test_nonnullable_impala`, `test_nonnullable_impala_strict`,
    `test_nullable_impala` and `test_nullable_impala_strict`.
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Connor Sanders <[email protected]>
---
 arrow-avro/src/codec.rs         | 126 +++++++++++--
 arrow-avro/src/reader/mod.rs    | 391 +++++++++++++++++++++++++++++++++++++++-
 arrow-avro/src/reader/record.rs |  36 ++--
 3 files changed, 508 insertions(+), 45 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 88b30a6d49..bd265503d7 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -148,7 +148,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
         match schema {
             Schema::Complex(ComplexType::Record(r)) => {
                 let mut resolver = Resolver::default();
-                let data_type = make_data_type(schema, None, &mut resolver, 
false)?;
+                let data_type = make_data_type(schema, None, &mut resolver, 
false, false)?;
                 Ok(AvroField {
                     data_type,
                     name: r.name.to_string(),
@@ -161,6 +161,60 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
     }
 }
 
+/// Builder for an [`AvroField`]
+#[derive(Debug)]
+pub struct AvroFieldBuilder<'a> {
+    schema: &'a Schema<'a>,
+    use_utf8view: bool,
+    strict_mode: bool,
+}
+
+impl<'a> AvroFieldBuilder<'a> {
+    /// Creates a new [`AvroFieldBuilder`]
+    pub fn new(schema: &'a Schema<'a>) -> Self {
+        Self {
+            schema,
+            use_utf8view: false,
+            strict_mode: false,
+        }
+    }
+
+    /// Enable or disable Utf8View support
+    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
+        self.use_utf8view = use_utf8view;
+        self
+    }
+
+    /// Enable or disable strict mode.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Build an [`AvroField`] from the builder
+    pub fn build(self) -> Result<AvroField, ArrowError> {
+        match self.schema {
+            Schema::Complex(ComplexType::Record(r)) => {
+                let mut resolver = Resolver::default();
+                let data_type = make_data_type(
+                    self.schema,
+                    None,
+                    &mut resolver,
+                    self.use_utf8view,
+                    self.strict_mode,
+                )?;
+                Ok(AvroField {
+                    name: r.name.to_string(),
+                    data_type,
+                })
+            }
+            _ => Err(ArrowError::ParseError(format!(
+                "Expected a Record schema to build an AvroField, but got {:?}",
+                self.schema
+            ))),
+        }
+    }
+}
 /// An Avro encoding
 ///
 /// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
@@ -409,6 +463,7 @@ fn make_data_type<'a>(
     namespace: Option<&'a str>,
     resolver: &mut Resolver<'a>,
     use_utf8view: bool,
+    strict_mode: bool,
 ) -> Result<AvroDataType, ArrowError> {
     match schema {
         Schema::TypeName(TypeName::Primitive(p)) => {
@@ -428,12 +483,20 @@ fn make_data_type<'a>(
                 .position(|x| x == 
&Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
             match (f.len() == 2, null) {
                 (true, Some(0)) => {
-                    let mut field = make_data_type(&f[1], namespace, resolver, 
use_utf8view)?;
+                    let mut field =
+                        make_data_type(&f[1], namespace, resolver, 
use_utf8view, strict_mode)?;
                     field.nullability = Some(Nullability::NullFirst);
                     Ok(field)
                 }
                 (true, Some(1)) => {
-                    let mut field = make_data_type(&f[0], namespace, resolver, 
use_utf8view)?;
+                    if strict_mode {
+                        return Err(ArrowError::SchemaError(
+                            "Found Avro union of the form ['T','null'], which 
is disallowed in strict_mode"
+                                .to_string(),
+                        ));
+                    }
+                    let mut field =
+                        make_data_type(&f[0], namespace, resolver, 
use_utf8view, strict_mode)?;
                     field.nullability = Some(Nullability::NullSecond);
                     Ok(field)
                 }
@@ -456,6 +519,7 @@ fn make_data_type<'a>(
                                 namespace,
                                 resolver,
                                 use_utf8view,
+                                strict_mode,
                             )?,
                         })
                     })
@@ -469,8 +533,13 @@ fn make_data_type<'a>(
                 Ok(field)
             }
             ComplexType::Array(a) => {
-                let mut field =
-                    make_data_type(a.items.as_ref(), namespace, resolver, 
use_utf8view)?;
+                let mut field = make_data_type(
+                    a.items.as_ref(),
+                    namespace,
+                    resolver,
+                    use_utf8view,
+                    strict_mode,
+                )?;
                 Ok(AvroDataType {
                     nullability: None,
                     metadata: a.attributes.field_metadata(),
@@ -535,7 +604,8 @@ fn make_data_type<'a>(
                 Ok(field)
             }
             ComplexType::Map(m) => {
-                let val = make_data_type(&m.values, namespace, resolver, 
use_utf8view)?;
+                let val =
+                    make_data_type(&m.values, namespace, resolver, 
use_utf8view, strict_mode)?;
                 Ok(AvroDataType {
                     nullability: None,
                     metadata: m.attributes.field_metadata(),
@@ -549,6 +619,7 @@ fn make_data_type<'a>(
                 namespace,
                 resolver,
                 use_utf8view,
+                strict_mode,
             )?;
 
             // https://avro.apache.org/docs/1.11.1/specification/#logical-types
@@ -630,7 +701,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Int, 
"date");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::Date32));
     }
@@ -640,7 +711,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Int, 
"time-millis");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimeMillis));
     }
@@ -650,7 +721,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Long, 
"time-micros");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimeMicros));
     }
@@ -660,7 +731,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Long, 
"timestamp-millis");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimestampMillis(true)));
     }
@@ -670,7 +741,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Long, 
"timestamp-micros");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimestampMicros(true)));
     }
@@ -680,7 +751,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Long, 
"local-timestamp-millis");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimestampMillis(false)));
     }
@@ -690,7 +761,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Long, 
"local-timestamp-micros");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::TimestampMicros(false)));
     }
@@ -745,7 +816,7 @@ mod tests {
         let schema = create_schema_with_logical_type(PrimitiveType::Int, 
"custom-type");
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert_eq!(
             result.metadata.get("logicalType"),
@@ -758,7 +829,7 @@ mod tests {
         let schema = 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
true).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, true, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::Utf8View));
     }
@@ -768,7 +839,7 @@ mod tests {
         let schema = 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
false).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, false, 
false).unwrap();
 
         assert!(matches!(result.codec, Codec::Utf8));
     }
@@ -796,7 +867,7 @@ mod tests {
         let schema = Schema::Complex(ComplexType::Record(record));
 
         let mut resolver = Resolver::default();
-        let result = make_data_type(&schema, None, &mut resolver, 
true).unwrap();
+        let result = make_data_type(&schema, None, &mut resolver, true, 
false).unwrap();
 
         if let Codec::Struct(fields) = &result.codec {
             let first_field_codec = &fields[0].data_type().codec;
@@ -805,4 +876,25 @@ mod tests {
             panic!("Expected Struct codec");
         }
     }
+
+    #[test]
+    fn test_union_with_strict_mode() {
+        let schema = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+        ]);
+
+        let mut resolver = Resolver::default();
+        let result = make_data_type(&schema, None, &mut resolver, false, true);
+
+        assert!(result.is_err());
+        match result {
+            Err(ArrowError::SchemaError(msg)) => {
+                assert!(msg.contains(
+                    "Found Avro union of the form ['T','null'], which is 
disallowed in strict_mode"
+                ));
+            }
+            _ => panic!("Expected SchemaError"),
+        }
+    }
 }
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 5059e41ff0..3bc7d94b7c 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -86,7 +86,7 @@
 //! ```
 //!
 
-use crate::codec::AvroField;
+use crate::codec::AvroFieldBuilder;
 use crate::schema::Schema as AvroSchema;
 use arrow_array::{RecordBatch, RecordBatchReader};
 use arrow_schema::{ArrowError, SchemaRef};
@@ -221,12 +221,11 @@ impl ReaderBuilder {
     }
 
     fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroField::try_from(schema)?;
-        RecordDecoder::try_new_with_options(
-            root_field.data_type(),
-            self.utf8_view,
-            self.strict_mode,
-        )
+        let root_field = AvroFieldBuilder::new(schema)
+            .with_utf8view(self.utf8_view)
+            .with_strict_mode(self.strict_mode)
+            .build()?;
+        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
     }
 
     fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
@@ -395,8 +394,12 @@ mod test {
     use crate::compression::CompressionCodec;
     use crate::reader::record::RecordDecoder;
     use crate::reader::vlq::VLQDecoder;
-    use crate::reader::{read_header, Decoder, ReaderBuilder};
+    use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
     use crate::test_util::arrow_test_data;
+    use arrow_array::builder::{
+        Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder, 
StructBuilder,
+    };
+
     use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
     use arrow_array::*;
     use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema};
@@ -422,6 +425,19 @@ mod test {
         arrow::compute::concat_batches(&schema, &batches).unwrap()
     }
 
+    fn read_file_strict(
+        path: &str,
+        batch_size: usize,
+        utf8_view: bool,
+    ) -> Result<Reader<BufReader<File>>, ArrowError> {
+        let file = File::open(path).unwrap();
+        ReaderBuilder::new()
+            .with_batch_size(batch_size)
+            .with_utf8_view(utf8_view)
+            .with_strict_mode(true)
+            .build(BufReader::new(file))
+    }
+
     fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
         mut decoder: Decoder,
         mut input: S,
@@ -857,4 +873,363 @@ mod test {
         .unwrap();
         assert_eq!(&expected_uuid_array, uuid_array);
     }
+
+    #[test]
+    fn test_nonnullable_impala() {
+        let file = arrow_test_data("avro/nonnullable.impala.avro");
+        let id = Int64Array::from(vec![Some(8)]);
+        let mut int_array_builder = ListBuilder::new(Int32Builder::new());
+        {
+            let vb = int_array_builder.values();
+            vb.append_value(-1);
+        }
+        int_array_builder.append(true); // finalize one sub-list
+        let int_array = int_array_builder.finish();
+        let mut iaa_builder = 
ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+        {
+            let inner_list_builder = iaa_builder.values();
+            {
+                let vb = inner_list_builder.values();
+                vb.append_value(-1);
+                vb.append_value(-2);
+            }
+            inner_list_builder.append(true);
+            inner_list_builder.append(true);
+        }
+        iaa_builder.append(true);
+        let int_array_array = iaa_builder.finish();
+        use arrow_array::builder::MapFieldNames;
+        let field_names = MapFieldNames {
+            entry: "entries".to_string(),
+            key: "key".to_string(),
+            value: "value".to_string(),
+        };
+        let mut int_map_builder =
+            MapBuilder::new(Some(field_names), StringBuilder::new(), 
Int32Builder::new());
+        {
+            let (keys, vals) = int_map_builder.entries();
+            keys.append_value("k1");
+            vals.append_value(-1);
+        }
+        int_map_builder.append(true).unwrap(); // finalize map for row 0
+        let int_map = int_map_builder.finish();
+        let field_names2 = MapFieldNames {
+            entry: "entries".to_string(),
+            key: "key".to_string(),
+            value: "value".to_string(),
+        };
+        let mut ima_builder = ListBuilder::new(MapBuilder::new(
+            Some(field_names2),
+            StringBuilder::new(),
+            Int32Builder::new(),
+        ));
+        {
+            let map_builder = ima_builder.values();
+            map_builder.append(true).unwrap();
+            {
+                let (keys, vals) = map_builder.entries();
+                keys.append_value("k1");
+                vals.append_value(1);
+            }
+            map_builder.append(true).unwrap();
+            map_builder.append(true).unwrap();
+            map_builder.append(true).unwrap();
+        }
+        ima_builder.append(true);
+        let int_map_array_ = ima_builder.finish();
+        let mut nested_sb = StructBuilder::new(
+            vec![
+                Arc::new(Field::new("a", DataType::Int32, true)),
+                Arc::new(Field::new(
+                    "B",
+                    DataType::List(Arc::new(Field::new("item", 
DataType::Int32, true))),
+                    true,
+                )),
+                Arc::new(Field::new(
+                    "c",
+                    DataType::Struct(
+                        vec![Field::new(
+                            "D",
+                            DataType::List(Arc::new(Field::new(
+                                "item",
+                                DataType::List(Arc::new(Field::new(
+                                    "item",
+                                    DataType::Struct(
+                                        vec![
+                                            Field::new("e", DataType::Int32, 
true),
+                                            Field::new("f", DataType::Utf8, 
true),
+                                        ]
+                                        .into(),
+                                    ),
+                                    true,
+                                ))),
+                                true,
+                            ))),
+                            true,
+                        )]
+                        .into(),
+                    ),
+                    true,
+                )),
+                Arc::new(Field::new(
+                    "G",
+                    DataType::Map(
+                        Arc::new(Field::new(
+                            "entries",
+                            DataType::Struct(
+                                vec![
+                                    Field::new("key", DataType::Utf8, false),
+                                    Field::new(
+                                        "value",
+                                        DataType::Struct(
+                                            vec![Field::new(
+                                                "h",
+                                                DataType::Struct(
+                                                    vec![Field::new(
+                                                        "i",
+                                                        
DataType::List(Arc::new(Field::new(
+                                                            "item",
+                                                            DataType::Float64,
+                                                            true,
+                                                        ))),
+                                                        true,
+                                                    )]
+                                                    .into(),
+                                                ),
+                                                true,
+                                            )]
+                                            .into(),
+                                        ),
+                                        true,
+                                    ),
+                                ]
+                                .into(),
+                            ),
+                            false,
+                        )),
+                        false,
+                    ),
+                    true,
+                )),
+            ],
+            vec![
+                Box::new(Int32Builder::new()),
+                Box::new(ListBuilder::new(Int32Builder::new())),
+                {
+                    let d_field = Field::new(
+                        "D",
+                        DataType::List(Arc::new(Field::new(
+                            "item",
+                            DataType::List(Arc::new(Field::new(
+                                "item",
+                                DataType::Struct(
+                                    vec![
+                                        Field::new("e", DataType::Int32, true),
+                                        Field::new("f", DataType::Utf8, true),
+                                    ]
+                                    .into(),
+                                ),
+                                true,
+                            ))),
+                            true,
+                        ))),
+                        true,
+                    );
+                    Box::new(StructBuilder::new(
+                        vec![Arc::new(d_field)],
+                        vec![Box::new({
+                            let ef_struct_builder = StructBuilder::new(
+                                vec![
+                                    Arc::new(Field::new("e", DataType::Int32, 
true)),
+                                    Arc::new(Field::new("f", DataType::Utf8, 
true)),
+                                ],
+                                vec![
+                                    Box::new(Int32Builder::new()),
+                                    Box::new(StringBuilder::new()),
+                                ],
+                            );
+                            let list_of_ef = 
ListBuilder::new(ef_struct_builder);
+                            ListBuilder::new(list_of_ef)
+                        })],
+                    ))
+                },
+                {
+                    let map_field_names = MapFieldNames {
+                        entry: "entries".to_string(),
+                        key: "key".to_string(),
+                        value: "value".to_string(),
+                    };
+                    let i_list_builder = 
ListBuilder::new(Float64Builder::new());
+                    let h_struct = StructBuilder::new(
+                        vec![Arc::new(Field::new(
+                            "i",
+                            DataType::List(Arc::new(Field::new("item", 
DataType::Float64, true))),
+                            true,
+                        ))],
+                        vec![Box::new(i_list_builder)],
+                    );
+                    let g_value_builder = StructBuilder::new(
+                        vec![Arc::new(Field::new(
+                            "h",
+                            DataType::Struct(
+                                vec![Field::new(
+                                    "i",
+                                    DataType::List(Arc::new(Field::new(
+                                        "item",
+                                        DataType::Float64,
+                                        true,
+                                    ))),
+                                    true,
+                                )]
+                                .into(),
+                            ),
+                            true,
+                        ))],
+                        vec![Box::new(h_struct)],
+                    );
+                    Box::new(MapBuilder::new(
+                        Some(map_field_names),
+                        StringBuilder::new(),
+                        g_value_builder,
+                    ))
+                },
+            ],
+        );
+        nested_sb.append(true);
+        {
+            let a_builder = 
nested_sb.field_builder::<Int32Builder>(0).unwrap();
+            a_builder.append_value(-1);
+        }
+        {
+            let b_builder = nested_sb
+                .field_builder::<ListBuilder<Int32Builder>>(1)
+                .unwrap();
+            {
+                let vb = b_builder.values();
+                vb.append_value(-1);
+            }
+            b_builder.append(true);
+        }
+        {
+            let c_struct_builder = 
nested_sb.field_builder::<StructBuilder>(2).unwrap();
+            c_struct_builder.append(true);
+            let d_list_builder = c_struct_builder
+                .field_builder::<ListBuilder<ListBuilder<StructBuilder>>>(0)
+                .unwrap();
+            {
+                let sub_list_builder = d_list_builder.values();
+                {
+                    let ef_struct = sub_list_builder.values();
+                    ef_struct.append(true);
+                    {
+                        let e_b = 
ef_struct.field_builder::<Int32Builder>(0).unwrap();
+                        e_b.append_value(-1);
+                        let f_b = 
ef_struct.field_builder::<StringBuilder>(1).unwrap();
+                        f_b.append_value("nonnullable");
+                    }
+                    sub_list_builder.append(true);
+                }
+                d_list_builder.append(true);
+            }
+        }
+        {
+            let g_map_builder = nested_sb
+                .field_builder::<MapBuilder<StringBuilder, StructBuilder>>(3)
+                .unwrap();
+            g_map_builder.append(true).unwrap();
+        }
+        let nested_struct = nested_sb.finish();
+        let expected = RecordBatch::try_from_iter_with_nullable([
+            ("ID", Arc::new(id) as Arc<dyn Array>, true),
+            ("Int_Array", Arc::new(int_array), true),
+            ("int_array_array", Arc::new(int_array_array), true),
+            ("Int_Map", Arc::new(int_map), true),
+            ("int_map_array", Arc::new(int_map_array_), true),
+            ("nested_Struct", Arc::new(nested_struct), true),
+        ])
+        .unwrap();
+        let batch_large = read_file(&file, 8, false);
+        assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
+        let batch_small = read_file(&file, 3, false);
+        assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
+    }
+
+    #[test]
+    fn test_nonnullable_impala_strict() {
+        let file = arrow_test_data("avro/nonnullable.impala.avro");
+        let err = read_file_strict(&file, 8, false).unwrap_err();
+        assert!(err.to_string().contains(
+            "Found Avro union of the form ['T','null'], which is disallowed in 
strict_mode"
+        ));
+    }
+
+    #[test]
+    fn test_nullable_impala() {
+        let file = arrow_test_data("avro/nullable.impala.avro");
+        let batch1 = read_file(&file, 3, false);
+        let batch2 = read_file(&file, 8, false);
+        assert_eq!(batch1, batch2);
+        let batch = batch1;
+        assert_eq!(batch.num_rows(), 7);
+        let id_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("id column should be an Int64Array");
+        let expected_ids = [1, 2, 3, 4, 5, 6, 7];
+        for (i, &expected_id) in expected_ids.iter().enumerate() {
+            assert_eq!(id_array.value(i), expected_id, "Mismatch in id at row 
{i}",);
+        }
+        let int_array = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("int_array column should be a ListArray");
+        {
+            let offsets = int_array.value_offsets();
+            let start = offsets[0] as usize;
+            let end = offsets[1] as usize;
+            let values = int_array
+                .values()
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .expect("Values of int_array should be an Int32Array");
+            let row0: Vec<Option<i32>> = (start..end).map(|i| 
Some(values.value(i))).collect();
+            assert_eq!(
+                row0,
+                vec![Some(1), Some(2), Some(3)],
+                "Mismatch in int_array row 0"
+            );
+        }
+        let nested_struct = batch
+            .column(5)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .expect("nested_struct column should be a StructArray");
+        let a_array = nested_struct
+            .column_by_name("A")
+            .expect("Field A should exist in nested_struct")
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Field A should be an Int32Array");
+        assert_eq!(a_array.value(0), 1, "Mismatch in nested_struct.A at row 
0");
+        assert!(
+            !a_array.is_valid(1),
+            "Expected null in nested_struct.A at row 1"
+        );
+        assert!(
+            !a_array.is_valid(3),
+            "Expected null in nested_struct.A at row 3"
+        );
+        assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row 
6");
+    }
+
+    #[test]
+    fn test_nullable_impala_strict() {
+        let file = arrow_test_data("avro/nullable.impala.avro");
+        let err = read_file_strict(&file, 8, false).unwrap_err();
+        assert!(err.to_string().contains(
+            "Found Avro union of the form ['T','null'], which is disallowed in 
strict_mode"
+        ));
+    }
 }
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 2ef382a226..180afcd2d8 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -43,7 +43,6 @@ const DEFAULT_CAPACITY: usize = 1024;
 pub(crate) struct RecordDecoderBuilder<'a> {
     data_type: &'a AvroDataType,
     use_utf8view: bool,
-    strict_mode: bool,
 }
 
 impl<'a> RecordDecoderBuilder<'a> {
@@ -51,7 +50,6 @@ impl<'a> RecordDecoderBuilder<'a> {
         Self {
             data_type,
             use_utf8view: false,
-            strict_mode: false,
         }
     }
 
@@ -60,14 +58,9 @@ impl<'a> RecordDecoderBuilder<'a> {
         self
     }
 
-    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
-        self.strict_mode = strict_mode;
-        self
-    }
-
     /// Builds the `RecordDecoder`.
     pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
-        RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view, 
self.strict_mode)
+        RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
     }
 }
 
@@ -77,7 +70,6 @@ pub(crate) struct RecordDecoder {
     schema: SchemaRef,
     fields: Vec<Decoder>,
     use_utf8view: bool,
-    strict_mode: bool,
 }
 
 impl RecordDecoder {
@@ -90,7 +82,6 @@ impl RecordDecoder {
     pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self, 
ArrowError> {
         RecordDecoderBuilder::new(data_type)
             .with_utf8_view(true)
-            .with_strict_mode(true)
             .build()
     }
 
@@ -109,14 +100,12 @@ impl RecordDecoder {
     pub(crate) fn try_new_with_options(
         data_type: &AvroDataType,
         use_utf8view: bool,
-        strict_mode: bool,
     ) -> Result<Self, ArrowError> {
         match Decoder::try_new(data_type)? {
             Decoder::Record(fields, encodings) => Ok(Self {
                 schema: Arc::new(ArrowSchema::new(fields)),
                 fields: encodings,
                 use_utf8view,
-                strict_mode,
             }),
             encoding => Err(ArrowError::ParseError(format!(
                 "Expected record got {encoding:?}"
@@ -331,7 +320,6 @@ impl Decoder {
             }
             Self::Array(_, offsets, e) => {
                 offsets.push_length(0);
-                e.append_null();
             }
             Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
             Self::Map(_, _koff, moff, _, _) => {
@@ -344,7 +332,10 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
-            Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
+            Self::Nullable(_, null_buffer, inner) => {
+                null_buffer.append(false);
+                inner.append_null();
+            }
         }
     }
 
@@ -431,12 +422,17 @@ impl Decoder {
                 let nanos = (millis as i64) * 1_000_000;
                 builder.append_value(IntervalMonthDayNano::new(months as i32, 
days as i32, nanos));
             }
-            Self::Nullable(nullability, nulls, e) => {
-                let is_valid = buf.get_bool()? == matches!(nullability, 
Nullability::NullFirst);
-                nulls.append(is_valid);
-                match is_valid {
-                    true => e.decode(buf)?,
-                    false => e.append_null(),
+            Self::Nullable(order, nb, encoding) => {
+                let branch = buf.read_vlq()?;
+                let is_not_null = match *order {
+                    Nullability::NullFirst => branch != 0,
+                    Nullability::NullSecond => branch == 0,
+                };
+                nb.append(is_not_null);
+                if is_not_null {
+                    encoding.decode(buf)?;
+                } else {
+                    encoding.append_null();
                 }
             }
         }


Reply via email to