This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c10fe77540 Add tests and fixes for schema resolution bug (#9237)
c10fe77540 is described below
commit c10fe77540e04ab9f12afec7d28b43da7083b20b
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Jan 26 15:50:31 2026 -0600
Add tests and fixes for schema resolution bug (#9237)
# Which issue does this PR close?
- Closes #9231.
# Rationale for this change
Avro schema resolution allows a reader schema to represent “nullable”
values using a two-branch union (`["null", T]` or `[T, "null"]`) while
still reading data written with the non-union schema `T` (i.e. without
union discriminants in the encoded data).
In `arrow-avro`, resolving a non-union writer type against a reader
union (notably for array/list item schemas like `items: ["null",
"int"]`) could incorrectly treat the encoded stream as a union and
attempt to decode a union discriminant. This would misalign decoding and
could surface as `ParseError("bad varint")` for certain files (see
#9231).
# What changes are included in this PR?
- Fix schema resolution when the *writer* schema is non-union and the
*reader* schema is a union:
- Special-case two-branch unions containing `null` to be treated as
“nullable” (capturing whether `null` is first or second), and resolve
against the non-null branch.
- Improve matching for general reader unions by attempting to resolve
against each union variant, preferring a direct match, and constructing
the appropriate union resolution mapping for the selected branch.
- Ensure promotions are represented at the union-resolution level
(avoiding nested promotion resolution on the selected union child).
- Add regression coverage for the bug and the fixed behavior:
- `test_resolve_array_writer_nonunion_items_reader_nullable_items`
(schema resolution / codec)
- `test_array_decoding_writer_nonunion_items_reader_nullable_items`
(record decoding; ensures correct byte consumption and decoded values)
- `test_bad_varint_bug_nullable_array_items` (end-to-end reader
regression using a small Avro fixture)
- Add a small compressed Avro fixture under
`arrow-avro/test/data/bad-varint-bug.avro.gz` used by the regression
test.
# Are these changes tested?
Yes. This PR adds targeted unit/integration tests that reproduce the
prior failure mode and validate correct schema resolution and decoding
for nullable-union array items.
# Are there any user-facing changes?
Yes (bug fix): reading Avro files with arrays whose element type is
represented as a nullable union in the reader schema (e.g. `items:
["null", "int"]`) now succeeds instead of failing with `ParseError("bad
varint")`. No public API changes are intended.
---------
Co-authored-by: Mikhail Zabaluev <[email protected]>
---
arrow-avro/src/codec.rs | 118 ++++++++++++++++++++++++----
arrow-avro/src/reader/mod.rs | 42 ++++++++++
arrow-avro/src/reader/record.rs | 48 +++++++++++
arrow-avro/test/data/bad-varint-bug.avro.gz | Bin 0 -> 254 bytes
4 files changed, 191 insertions(+), 17 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 04ef87d7ef..365037b2f4 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -1529,23 +1529,71 @@ impl<'a> Maker<'a> {
Ok(dt)
}
(writer_non_union, Schema::Union(reader_variants)) => {
- let promo = self.find_best_promotion(
- writer_non_union,
- reader_variants.as_slice(),
- namespace,
- );
- let Some((reader_index, promotion)) = promo else {
- return Err(ArrowError::SchemaError(
- "Writer schema does not match any reader union
branch".to_string(),
- ));
- };
- let mut dt = self.parse_type(reader_schema, namespace)?;
- dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
- writer_to_reader: Arc::from(vec![Some((reader_index,
promotion))]),
- writer_is_union: false,
- reader_is_union: true,
- }));
- Ok(dt)
+ if let Some((nullability, non_null_branch)) =
+ nullable_union_variants(reader_variants)
+ {
+ let mut dt = self.resolve_type(writer_non_union,
non_null_branch, namespace)?;
+ let non_null_idx = match nullability {
+ Nullability::NullFirst => 1,
+ Nullability::NullSecond => 0,
+ };
+ #[cfg(feature = "avro_custom_types")]
+ Self::propagate_nullability_into_ree(&mut dt, nullability);
+ dt.nullability = Some(nullability);
+ let promotion = Self::coercion_from(&dt);
+ dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: Arc::from(vec![Some((non_null_idx,
promotion))]),
+ writer_is_union: false,
+ reader_is_union: true,
+ }));
+ Ok(dt)
+ } else {
+ let mut best_match: Option<(usize, AvroDataType,
Promotion)> = None;
+ for (i, variant) in reader_variants.iter().enumerate() {
+ if let Ok(resolved_dt) =
+ self.resolve_type(writer_non_union, variant,
namespace)
+ {
+ let promotion = Self::coercion_from(&resolved_dt);
+ if promotion == Promotion::Direct {
+ best_match = Some((i, resolved_dt, promotion));
+ break;
+ } else if best_match.is_none() {
+ best_match = Some((i, resolved_dt, promotion));
+ }
+ }
+ }
+ let Some((match_idx, match_dt, promotion)) = best_match
else {
+ return Err(ArrowError::SchemaError(
+ "Writer schema does not match any reader union
branch".to_string(),
+ ));
+ };
+ let mut children =
Vec::with_capacity(reader_variants.len());
+ let mut match_dt = Some(match_dt);
+ for (i, variant) in reader_variants.iter().enumerate() {
+ if i == match_idx {
+ if let Some(mut dt) = match_dt.take() {
+ if matches!(dt.resolution,
Some(ResolutionInfo::Promotion(_))) {
+ dt.resolution = None;
+ }
+ children.push(dt);
+ }
+ } else {
+ children.push(self.parse_type(variant,
namespace)?);
+ }
+ }
+ let union_fields = build_union_fields(&children)?;
+ let mut dt = AvroDataType::new(
+ Codec::Union(children.into(), union_fields,
UnionMode::Dense),
+ Default::default(),
+ None,
+ );
+ dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: Arc::from(vec![Some((match_idx,
promotion))]),
+ writer_is_union: false,
+ reader_is_union: true,
+ }));
+ Ok(dt)
+ }
}
(
Schema::Complex(ComplexType::Array(writer_array)),
@@ -2926,6 +2974,42 @@ mod tests {
}
}
+ #[test]
+ fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
+ let writer_schema = Schema::Complex(ComplexType::Array(Array {
+ items:
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
+ attributes: Attributes::default(),
+ }));
+ let reader_schema = Schema::Complex(ComplexType::Array(Array {
+ items: Box::new(mk_union(vec![
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+ ])),
+ attributes: Attributes::default(),
+ }));
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer_schema, Some(&reader_schema), None)
+ .unwrap();
+ if let Codec::List(inner) = dt.codec() {
+ assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
+ assert!(matches!(inner.codec(), Codec::Int32));
+ match inner.resolution.as_ref() {
+ Some(ResolutionInfo::Union(info)) => {
+ assert!(!info.writer_is_union, "writer should be
non-union");
+ assert!(info.reader_is_union, "reader should be union");
+ assert_eq!(
+ info.writer_to_reader.as_ref(),
+ &[Some((1, Promotion::Direct))]
+ );
+ }
+ other => panic!("expected Union resolution, got {other:?}"),
+ }
+ } else {
+ panic!("expected List codec");
+ }
+ }
+
#[test]
fn test_resolve_fixed_success_name_and_size_match_and_alias() {
let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index b5750cc6a3..b040aa9d42 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -9081,4 +9081,46 @@ mod test {
"entire RecordBatch mismatch (schema, all columns, all rows)"
);
}
+
+ #[test]
+ fn test_bad_varint_bug_nullable_array_items() {
+ use flate2::read::GzDecoder;
+ use std::io::Read;
+ let manifest_dir = env!("CARGO_MANIFEST_DIR");
+ let gz_path =
format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
+ let gz_file = File::open(&gz_path).expect("test file should exist");
+ let mut decoder = GzDecoder::new(gz_file);
+ let mut avro_bytes = Vec::new();
+ decoder
+ .read_to_end(&mut avro_bytes)
+ .expect("should decompress");
+ let reader_arrow_schema = Schema::new(vec![Field::new(
+ "int_array",
+ DataType::List(Arc::new(Field::new("element", DataType::Int32,
true))),
+ true,
+ )])
+ .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
+ let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
+ .expect("should convert Arrow schema to Avro");
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(Cursor::new(avro_bytes))
+ .expect("should build reader");
+ let batch = reader
+ .next()
+ .expect("should have one batch")
+ .expect("reading should succeed without bad varint error");
+ assert_eq!(batch.num_rows(), 1);
+ let list_col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .expect("should be ListArray");
+ assert_eq!(list_col.len(), 1);
+ let values = list_col.values();
+ let int_values = values.as_primitive::<Int32Type>();
+ assert_eq!(int_values.len(), 2);
+ assert_eq!(int_values.value(0), 1);
+ assert_eq!(int_values.value(1), 2);
+ }
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 648baa60c7..ec69e7788c 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -2897,6 +2897,54 @@ mod tests {
assert_eq!(list_arr.value_length(0), 0);
}
+ #[test]
+ fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
+ use crate::schema::Array;
+ let writer_schema = Schema::Complex(ComplexType::Array(Array {
+ items:
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
+ attributes: Attributes::default(),
+ }));
+ let reader_schema = Schema::Complex(ComplexType::Array(Array {
+ items: Box::new(Schema::Union(vec![
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+ ])),
+ attributes: Attributes::default(),
+ }));
+ let dt = resolved_root_datatype(writer_schema, reader_schema, false,
false);
+ if let Codec::List(inner) = dt.codec() {
+ assert_eq!(
+ inner.nullability(),
+ Some(Nullability::NullFirst),
+ "items should be nullable"
+ );
+ } else {
+ panic!("expected List codec");
+ }
+ let mut decoder = Decoder::try_new(&dt).unwrap();
+ let mut data = encode_avro_long(2);
+ data.extend(encode_avro_int(10));
+ data.extend(encode_avro_int(20));
+ data.extend(encode_avro_long(0));
+ let mut cursor = AvroCursor::new(&data);
+ decoder.decode(&mut cursor).unwrap();
+ assert_eq!(
+ cursor.position(),
+ data.len(),
+ "all bytes should be consumed"
+ );
+ let array = decoder.flush(None).unwrap();
+ let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(list_arr.len(), 1, "one list/row");
+ assert_eq!(list_arr.value_length(0), 2, "two items in the list");
+ let values = list_arr.values().as_primitive::<Int32Type>();
+ assert_eq!(values.len(), 2);
+ assert_eq!(values.value(0), 10);
+ assert_eq!(values.value(1), 20);
+ assert!(!values.is_null(0));
+ assert!(!values.is_null(1));
+ }
+
#[test]
fn test_decimal_decoding_fixed256() {
let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
diff --git a/arrow-avro/test/data/bad-varint-bug.avro.gz
b/arrow-avro/test/data/bad-varint-bug.avro.gz
new file mode 100644
index 0000000000..7feef49fd7
Binary files /dev/null and b/arrow-avro/test/data/bad-varint-bug.avro.gz differ