This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 67c9f9bb83 fix: Resolve Avro RecordEncoder bugs related to nullable
Struct fields and Union type ids (#8935)
67c9f9bb83 is described below
commit 67c9f9bb8351d4fefd65603079a2f5f21d5a3087
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Dec 11 13:15:45 2025 -0600
fix: Resolve Avro RecordEncoder bugs related to nullable Struct fields and
Union type ids (#8935)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/8934
# Rationale for this change
The `arrow-avro` writer currently fails on two classes of *valid* Arrow
inputs:
1. **Nullable `Struct` with non‑nullable children + row‑wise sliced
encoding**
When encoding a `RecordBatch` row‑by‑row, a nullable `Struct` field
whose child is non‑nullable can cause the writer to error with `Invalid
argument error: Avro site '{field}' is non-nullable, but array contains
nulls`, even when the parent `Struct` is null at that row and the child
value should be ignored.
2. **Dense `UnionArray` with non‑zero, non‑consecutive type ids**
A dense `UnionArray` whose `UnionFields` use type ids such as `2` and
`5` will currently fail with a `SchemaError("Binding and field
mismatch")`, even though this layout is valid per Arrow’s union
semantics.
This PR updates the `RecordEncoder` to resolve both of these issues and
better respect Arrow’s struct/union semantics.
# What changes are included in this PR?
This PR touches only the `arrow-avro` writer implementation,
specifically `arrow-avro/src/writer/encoder.rs` and
`arrow-avro/src/writer/mod.rs`.
**1. Fix nullable struct + non‑nullable child handling**
* Adjusts the `RecordEncoder` / `StructEncoder` path so that **child
field null validation is masked by the parent `Struct`’s null bitmap**.
* For rows where the parent `Struct` value is null, the encoder now
**skips encoding the non‑nullable children** for that row, instead of
treating any child‑side nulls as a violation of the Avro site’s
nullability.
* This ensures that row‑wise encoding of a sliced `RecordBatch`, like
the one in the issue’s reproducing test, now succeeds without triggering
`Invalid argument error: Avro site '{field}' is non-nullable, but array
contains nulls`.
**2. Support dense unions with non‑zero, non‑consecutive type ids**
* Updates the union encoding path (`UnionEncoder`) so that it no longer
assumes Arrow dense union type IDs are `0..N-1`.
* The encoder now **builds an explicit mapping from Arrow `type_ids` (as
declared in `UnionFields`) to Avro union branch indices**, and uses this
mapping when:
* constructing the union’s Avro schema binding, and
* writing out the branch index and value for each union element.
* As a result, dense unions with type ids such as `2` and `5` now encode
successfully, matching Arrow’s semantics that only require type ids to
be consistent with `UnionFields`, not only contiguous and/or zero‑based.
**3. Regression tests for both bugs**
Adds targeted regression tests under `arrow-avro/src/writer/mod.rs`’s
test module to validate the fixes:
1. **`test_nullable_struct_with_nonnullable_field_sliced_encoding`**
* Builds the nullable `Struct` + non‑nullable child scenario from the
issue.
* Encodes the `RecordBatch` one row at a time via
`WriterBuilder::new(schema).with_fingerprint_strategy(FingerprintStrategy::Id(1)).build::<_,
AvroSoeFormat>(...)` and asserts all rows encode successfully.
2. **`test_nullable_struct_with_decimal_and_timestamp_sliced`**
* Constructs a `RecordBatch` containing nullable `Struct` fields
populated with `Decimal128` and `TimestampMicrosecond` types to verify
encoding of complex nested data.
* Encodes the `RecordBatch` one row at a time using `AvroSoeFormat` and
`FingerprintStrategy::Id(1)`, asserting that each sliced row encodes
successfully.
3. **`non_nullable_child_in_nullable_struct_should_encode_per_row`**
* Builds a test case with a nullable `Struct` column containing a
non-nullable child field, alongside a timestamp column.
* Slices a single row from the batch and writes it via `AvroSoeFormat`,
asserting that `writer.write` returns `Ok` to confirm the fix for sliced
encoding constraints.
4. **`test_union_nonzero_type_ids`**
* Constructs a dense `UnionArray` whose `UnionFields` use type ids `[2,
5]` and a mix of string/int values.
* Encodes via `AvroWriter` and asserts that writing and finishing the
writer both succeed without error.
Together these tests reproduce the failures described in #8934 and
confirm that the new encoder behavior handles them correctly.
# Are these changes tested?
Yes.
* New unit tests are added for both regression scenarios (nullable
struct + non‑nullable child, and dense union with non‑zero &
non‑consecutive type ids).
* Existing writer / reader integration tests (round‑trip tests, nested
record tests, etc.) continue to pass unchanged, ensuring that the
crate’s previously tested behavior / public API remains intact without
breaking changes.
# Are there any user-facing changes?
1. **Behavioral change (bug fix):**
* Previously, valid and supported Arrow inputs could cause the Avro
writer to error or panic in the two scenarios described above.
* After this change, those inputs encode successfully and produce Avro
output consistent with the generated or provided Avro schema.
2. **APIs and configuration:**
* No public APIs, types, or configuration options are added, removed, or
renamed.
* The on‑wire Avro representation for already‑supported layouts is
unchanged; the encoder simply now accepts valid Arrow layouts that were
failing prior.
The change is strictly a non-breaking backwards compatible bug fix that
makes the `arrow-avro` writer function as expected.
---
arrow-avro/src/writer/encoder.rs | 391 +++++++++++++++++----------------------
arrow-avro/src/writer/mod.rs | 235 ++++++++++++++++++++++-
2 files changed, 399 insertions(+), 227 deletions(-)
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 79aee4fae0..c638c2b73f 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -205,13 +205,13 @@ fn write_optional_index<W: Write + ?Sized>(
}
#[derive(Debug, Clone)]
-enum NullState {
+enum NullState<'a> {
NonNullable,
NullableNoNulls {
union_value_byte: u8,
},
Nullable {
- nulls: NullBuffer,
+ nulls: &'a NullBuffer,
null_order: Nullability,
},
}
@@ -221,13 +221,12 @@ enum NullState {
/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
pub(crate) struct FieldEncoder<'a> {
encoder: Encoder<'a>,
- null_state: NullState,
+ null_state: NullState<'a>,
}
impl<'a> FieldEncoder<'a> {
fn make_encoder(
array: &'a dyn Array,
- field: &Field,
plan: &FieldPlan,
nullability: Option<Nullability>,
) -> Result<Self, ArrowError> {
@@ -563,61 +562,48 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<UnionArray>()
.ok_or_else(|| ArrowError::SchemaError("Expected
UnionArray".into()))?;
-
Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
}
FieldPlan::RunEndEncoded {
values_nullability,
value_plan,
} => {
- let dt = array.data_type();
- let values_field = match dt {
- DataType::RunEndEncoded(_re_field, v_field) =>
v_field.as_ref(),
- other => {
- return Err(ArrowError::SchemaError(format!(
- "Avro RunEndEncoded site requires Arrow
DataType::RunEndEncoded, found: {other:?}"
- )));
- }
- };
// Helper closure to build a typed RunEncodedEncoder<R>
let build = |run_arr_any: &'a dyn Array| ->
Result<Encoder<'a>, ArrowError> {
if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
- let values_enc = prepare_value_site_encoder(
- arr.values().as_ref(),
- values_field,
- *values_nullability,
- value_plan.as_ref(),
- )?;
return
Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
Int16Type,
>::new(
- arr, values_enc
+ arr,
+ FieldEncoder::make_encoder(
+ arr.values().as_ref(),
+ value_plan.as_ref(),
+ *values_nullability,
+ )?,
))));
}
if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
- let values_enc = prepare_value_site_encoder(
- arr.values().as_ref(),
- values_field,
- *values_nullability,
- value_plan.as_ref(),
- )?;
return
Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
Int32Type,
>::new(
- arr, values_enc
+ arr,
+ FieldEncoder::make_encoder(
+ arr.values().as_ref(),
+ value_plan.as_ref(),
+ *values_nullability,
+ )?,
))));
}
if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
- let values_enc = prepare_value_site_encoder(
- arr.values().as_ref(),
- values_field,
- *values_nullability,
- value_plan.as_ref(),
- )?;
return
Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
Int64Type,
>::new(
- arr, values_enc
+ arr,
+ FieldEncoder::make_encoder(
+ arr.values().as_ref(),
+ value_plan.as_ref(),
+ *values_nullability,
+ )?,
))));
}
Err(ArrowError::SchemaError(
@@ -629,29 +615,19 @@ impl<'a> FieldEncoder<'a> {
}
};
// Compute the effective null state from writer-declared nullability
and data nulls.
- let null_state = match (nullability, array.null_count() > 0) {
- (None, false) => NullState::NonNullable,
- (None, true) => {
- return Err(ArrowError::InvalidArgumentError(format!(
- "Avro site '{}' is non-nullable, but array contains nulls",
- field.name()
- )));
- }
- (Some(order), false) => {
- // Optimization: drop any bitmap; emit a constant "value"
branch byte.
- NullState::NullableNoNulls {
- union_value_byte: union_value_branch_byte(order, false),
+ let null_state = match nullability {
+ None => NullState::NonNullable,
+ Some(null_order) => {
+ match array.nulls() {
+ Some(nulls) if array.null_count() > 0 => {
+ NullState::Nullable { nulls, null_order }
+ }
+ _ => NullState::NullableNoNulls {
+ // Nullable site with no null buffer for this view
+ union_value_byte: union_value_branch_byte(null_order,
false),
+ },
}
}
- (Some(null_order), true) => {
- let Some(nulls) = array.nulls().cloned() else {
- return Err(ArrowError::InvalidArgumentError(format!(
- "Array for Avro site '{}' reports nulls but has no
null buffer",
- field.name()
- )));
- };
- NullState::Nullable { nulls, null_order }
- }
};
Ok(Self {
encoder,
@@ -797,8 +773,6 @@ impl RecordEncoder {
&'a self,
batch: &'a RecordBatch,
) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
- let schema_binding = batch.schema();
- let fields = schema_binding.fields();
let arrays = batch.columns();
let mut out = Vec::with_capacity(self.columns.len());
for col_plan in self.columns.iter() {
@@ -806,7 +780,6 @@ impl RecordEncoder {
let array = arrays.get(arrow_index).ok_or_else(|| {
ArrowError::SchemaError(format!("Column index {arrow_index}
out of range"))
})?;
- let field = fields[arrow_index].as_ref();
#[cfg(not(feature = "avro_custom_types"))]
let site_nullability = match &col_plan.plan {
FieldPlan::RunEndEncoded { .. } => None,
@@ -814,13 +787,11 @@ impl RecordEncoder {
};
#[cfg(feature = "avro_custom_types")]
let site_nullability = col_plan.nullability;
- let encoder = prepare_value_site_encoder(
+ out.push(FieldEncoder::make_encoder(
array.as_ref(),
- field,
- site_nullability,
&col_plan.plan,
- )?;
- out.push(encoder);
+ site_nullability,
+ )?);
}
Ok(out)
}
@@ -1348,39 +1319,14 @@ impl<'a> MapEncoder<'a> {
)));
}
};
-
- let entries_struct_fields = match map.data_type() {
- DataType::Map(entries, _) => match entries.data_type() {
- DataType::Struct(fs) => fs,
- other => {
- return Err(ArrowError::SchemaError(format!(
- "Arrow Map entries must be Struct, found: {other:?}"
- )));
- }
- },
- _ => {
- return Err(ArrowError::SchemaError(
- "Expected MapArray with DataType::Map".into(),
- ));
- }
- };
-
- let v_idx =
find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
- ArrowError::SchemaError("Map entries struct missing value
field".into())
- })?;
- let value_field = entries_struct_fields[v_idx].as_ref();
-
- let values_enc = prepare_value_site_encoder(
- map.values().as_ref(),
- value_field,
- values_nullability,
- value_plan,
- )?;
-
Ok(Self {
map,
keys: keys_kind,
- values: values_enc,
+ values: FieldEncoder::make_encoder(
+ map.values().as_ref(),
+ value_plan,
+ values_nullability,
+ )?,
keys_offset: keys_arr.offset(),
values_offset: map.values().offset(),
})
@@ -1452,6 +1398,7 @@ impl EnumEncoder<'_> {
struct UnionEncoder<'a> {
encoders: Vec<FieldEncoder<'a>>,
array: &'a UnionArray,
+ type_id_to_encoder_index: Vec<Option<usize>>,
}
impl<'a> UnionEncoder<'a> {
@@ -1459,7 +1406,6 @@ impl<'a> UnionEncoder<'a> {
let DataType::Union(fields, UnionMode::Dense) = array.data_type() else
{
return Err(ArrowError::SchemaError("Expected Dense
UnionArray".into()));
};
-
if fields.len() != field_bindings.len() {
return Err(ArrowError::SchemaError(format!(
"Mismatched number of union branches between Arrow array ({})
and encoding plan ({})",
@@ -1467,37 +1413,44 @@ impl<'a> UnionEncoder<'a> {
field_bindings.len()
)));
}
+ let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
+ let mut type_id_to_encoder_index: Vec<Option<usize>> =
+ vec![None; (max_type_id + 1) as usize];
let mut encoders = Vec::with_capacity(fields.len());
- for (type_id, field_ref) in fields.iter() {
+ for (i, (type_id, _)) in fields.iter().enumerate() {
let binding = field_bindings
- .get(type_id as usize)
+ .get(i)
.ok_or_else(|| ArrowError::SchemaError("Binding and field
mismatch".to_string()))?;
-
- let child = array.child(type_id).as_ref();
-
- let encoder = prepare_value_site_encoder(
- child,
- field_ref.as_ref(),
- binding.nullability,
+ encoders.push(FieldEncoder::make_encoder(
+ array.child(type_id).as_ref(),
&binding.plan,
- )?;
- encoders.push(encoder);
+ binding.nullability,
+ )?);
+ type_id_to_encoder_index[type_id as usize] = Some(i);
}
- Ok(Self { encoders, array })
+ Ok(Self {
+ encoders,
+ array,
+ type_id_to_encoder_index,
+ })
}
fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ // SAFETY: `idx` is always in bounds because:
+ // 1. The encoder is called from `RecordEncoder::encode,` which
iterates over `0..batch.num_rows()`
+ // 2. `self.array` is a column from the same batch, so its length
equals `batch.num_rows()`
+ // 3. `type_ids()` returns a buffer with exactly `self.array.len()`
entries (one per logical element)
let type_id = self.array.type_ids()[idx];
- let branch_index = type_id as usize;
- write_int(out, type_id as i32)?;
- let child_row = self.array.value_offset(idx);
-
- let encoder = self
- .encoders
- .get_mut(branch_index)
+ let encoder_index = self
+ .type_id_to_encoder_index
+ .get(type_id as usize)
+ .and_then(|opt| *opt)
.ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id
{type_id}")))?;
-
- encoder.encode(out, child_row)
+ write_int(out, encoder_index as i32)?;
+ let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
+ ArrowError::SchemaError(format!("Invalid encoder index
{encoder_index}"))
+ })?;
+ encoder.encode(out, self.array.value_offset(idx))
}
}
@@ -1510,23 +1463,16 @@ impl<'a> StructEncoder<'a> {
array: &'a StructArray,
field_bindings: &[FieldBinding],
) -> Result<Self, ArrowError> {
- let DataType::Struct(fields) = array.data_type() else {
- return Err(ArrowError::SchemaError("Expected Struct".into()));
- };
let mut encoders = Vec::with_capacity(field_bindings.len());
for field_binding in field_bindings {
let idx = field_binding.arrow_index;
let column = array.columns().get(idx).ok_or_else(|| {
ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
})?;
- let field = fields.get(idx).ok_or_else(|| {
- ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
- })?;
- let encoder = prepare_value_site_encoder(
+ let encoder = FieldEncoder::make_encoder(
column.as_ref(),
- field,
- field_binding.nullability,
&field_binding.plan,
+ field_binding.nullability,
)?;
encoders.push(encoder);
}
@@ -1583,24 +1529,13 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
) -> Result<Self, ArrowError> {
- let child_field = match list.data_type() {
- DataType::List(field) => field.as_ref(),
- DataType::LargeList(field) => field.as_ref(),
- _ => {
- return Err(ArrowError::SchemaError(
- "Expected List or LargeList for ListEncoder".into(),
- ));
- }
- };
- let values_enc = prepare_value_site_encoder(
- list.values().as_ref(),
- child_field,
- items_nullability,
- item_plan,
- )?;
Ok(Self {
list,
- values: values_enc,
+ values: FieldEncoder::make_encoder(
+ list.values().as_ref(),
+ item_plan,
+ items_nullability,
+ )?,
values_offset: list.values().offset(),
})
}
@@ -1647,24 +1582,13 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
) -> Result<Self, ArrowError> {
- let child_field = match list.data_type() {
- DataType::ListView(field) => field.as_ref(),
- DataType::LargeListView(field) => field.as_ref(),
- _ => {
- return Err(ArrowError::SchemaError(
- "Expected ListView or LargeListView for
ListViewEncoder".into(),
- ));
- }
- };
- let values_enc = prepare_value_site_encoder(
- list.values().as_ref(),
- child_field,
- items_nullability,
- item_plan,
- )?;
Ok(Self {
list,
- values: values_enc,
+ values: FieldEncoder::make_encoder(
+ list.values().as_ref(),
+ item_plan,
+ items_nullability,
+ )?,
values_offset: list.values().offset(),
})
}
@@ -1701,23 +1625,13 @@ impl<'a> FixedSizeListEncoder<'a> {
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
) -> Result<Self, ArrowError> {
- let child_field = match list.data_type() {
- DataType::FixedSizeList(field, _len) => field.as_ref(),
- _ => {
- return Err(ArrowError::SchemaError(
- "Expected FixedSizeList for FixedSizeListEncoder".into(),
- ));
- }
- };
- let values_enc = prepare_value_site_encoder(
- list.values().as_ref(),
- child_field,
- items_nullability,
- item_plan,
- )?;
Ok(Self {
list,
- values: values_enc,
+ values: FieldEncoder::make_encoder(
+ list.values().as_ref(),
+ item_plan,
+ items_nullability,
+ )?,
values_offset: list.values().offset(),
elem_len: list.value_length() as usize,
})
@@ -1735,16 +1649,6 @@ impl<'a> FixedSizeListEncoder<'a> {
}
}
-fn prepare_value_site_encoder<'a>(
- values_array: &'a dyn Array,
- value_field: &Field,
- nullability: Option<Nullability>,
- plan: &FieldPlan,
-) -> Result<FieldEncoder<'a>, ArrowError> {
- // Effective nullability is computed here from the writer-declared site
nullability and data.
- FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
-}
-
/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
@@ -2049,8 +1953,7 @@ mod tests {
plan: &FieldPlan,
nullability: Option<Nullability>,
) -> Vec<u8> {
- let field = Field::new("f", array.data_type().clone(), true);
- let mut enc = FieldEncoder::make_encoder(array, &field, plan,
nullability).unwrap();
+ let mut enc = FieldEncoder::make_encoder(array, plan,
nullability).unwrap();
let mut out = Vec::new();
for i in 0..array.len() {
enc.encode(&mut out, i).unwrap();
@@ -2381,9 +2284,7 @@ mod tests {
FixedSizeBinaryArray::try_new(10,
arrow_buffer::Buffer::from(vec![0u8; 10]), None)
.unwrap();
let plan = FieldPlan::Uuid;
-
- let field = Field::new("f", arr.data_type().clone(), true);
- let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan,
None).unwrap();
+ let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -2749,8 +2650,7 @@ mod tests {
#[test]
fn duration_encoder_year_month_rejects_negative() {
let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
- let field = Field::new("f",
DataType::Interval(IntervalUnit::YearMonth), true);
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -2777,8 +2677,7 @@ mod tests {
fn duration_encoder_day_time_rejects_negative() {
let bad = IntervalDayTimeType::make_value(-1, 0);
let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
- let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime),
true);
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -2805,8 +2704,7 @@ mod tests {
fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
- let field = Field::new("f",
DataType::Interval(IntervalUnit::MonthDayNano), true);
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -2854,8 +2752,7 @@ mod tests {
let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
- let field = Field::new("f",
DataType::Interval(IntervalUnit::MonthDayNano), true);
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -3007,9 +2904,7 @@ mod tests {
// Time32(Second) must encode as Avro time-millis (ms since midnight).
let arr:
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
vec![0i32, 1, -2, 12_345].into();
-
let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
let mut expected = Vec::new();
for secs in [0i32, 1, -2, 12_345] {
let millis = (secs as i64) * 1000;
@@ -3022,16 +2917,8 @@ mod tests {
fn time32_seconds_to_millis_overflow() {
// Choose a value that will overflow i32 when multiplied by 1000.
let overflow_secs: i32 = i32::MAX / 1000 + 1;
- let arr:
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
- vec![overflow_secs].into();
-
- let field = arrow_schema::Field::new(
- "f",
- arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
- true,
- );
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
-
+ let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -3048,11 +2935,8 @@ mod tests {
#[test]
fn timestamp_seconds_to_millis_encoder() {
// Timestamp(Second) must encode as Avro timestamp-millis (ms since
epoch).
- let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
- vec![0i64, 1, -1, 1_234_567_890].into();
-
+ let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1,
1_234_567_890].into();
let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
let mut expected = Vec::new();
for secs in [0i64, 1, -1, 1_234_567_890] {
let millis = secs * 1000;
@@ -3065,16 +2949,8 @@ mod tests {
fn timestamp_seconds_to_millis_overflow() {
// Overflow i64 when multiplied by 1000.
let overflow_secs: i64 = i64::MAX / 1000 + 1;
- let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
- vec![overflow_secs].into();
-
- let field = arrow_schema::Field::new(
- "f",
- arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second,
None),
- true,
- );
- let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
-
+ let arr: PrimitiveArray<TimestampSecondType> =
vec![overflow_secs].into();
+ let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar,
None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
@@ -3090,15 +2966,80 @@ mod tests {
#[test]
fn timestamp_nanos_encoder() {
- let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
- vec![0i64, 1, -1, 123].into();
-
+ let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1,
123].into();
let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
let mut expected = Vec::new();
for ns in [0i64, 1, -1, 123] {
expected.extend_from_slice(&avro_long_bytes(ns));
}
assert_bytes_eq(&got, &expected);
}
+
+ #[test]
+ fn union_encoder_string_int_nonzero_type_ids() {
+ let strings = StringArray::from(vec!["hello", "world"]);
+ let ints = Int32Array::from(vec![10, 20, 30]);
+ let union_fields = UnionFields::new(
+ vec![2, 5],
+ vec![
+ Field::new("v_str", DataType::Utf8, true),
+ Field::new("v_int", DataType::Int32, true),
+ ],
+ );
+ let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
+ let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
+ let union_array = UnionArray::try_new(
+ union_fields,
+ type_ids.into(),
+ Some(offsets.into()),
+ vec![Arc::new(strings), Arc::new(ints)],
+ )
+ .unwrap();
+ let plan = FieldPlan::Union {
+ bindings: vec![
+ FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ FieldBinding {
+ arrow_index: 1,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ ],
+ };
+ let got = encode_all(&union_array, &plan, None);
+ let mut expected = Vec::new();
+ expected.extend(avro_long_bytes(0));
+ expected.extend(avro_len_prefixed_bytes(b"hello"));
+ expected.extend(avro_long_bytes(1));
+ expected.extend(avro_long_bytes(10));
+ expected.extend(avro_long_bytes(1));
+ expected.extend(avro_long_bytes(20));
+ expected.extend(avro_long_bytes(0));
+ expected.extend(avro_len_prefixed_bytes(b"world"));
+ expected.extend(avro_long_bytes(1));
+ expected.extend(avro_long_bytes(30));
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn nullable_state_with_null_buffer_and_zero_nulls() {
+ let values = vec![1i32, 2, 3];
+ let arr = Int32Array::from_iter_values_with_nulls(values,
Some(NullBuffer::new_valid(3)));
+ assert_eq!(arr.null_count(), 0);
+ assert!(arr.nulls().is_some());
+ let plan = FieldPlan::Scalar;
+ let enc = FieldEncoder::make_encoder(&arr, &plan,
Some(Nullability::NullFirst)).unwrap();
+ match enc.null_state {
+ NullState::NullableNoNulls { union_value_byte } => {
+ assert_eq!(
+ union_value_byte,
+ union_value_branch_byte(Nullability::NullFirst, false)
+ );
+ }
+ other => panic!("expected NullableNoNulls, got {other:?}"),
+ }
+ }
}
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 231c9846f9..9b3eea1d6f 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -409,10 +409,11 @@ mod tests {
};
use arrow_array::{
Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray,
RecordBatch,
- StructArray, UnionArray,
+ StringArray, StructArray, UnionArray,
};
#[cfg(feature = "avro_custom_types")]
- use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
+ use arrow_array::{Int16Array, Int64Array, RunArray};
+ use arrow_schema::UnionMode;
#[cfg(not(feature = "avro_custom_types"))]
use arrow_schema::{DataType, Field, Schema};
#[cfg(feature = "avro_custom_types")]
@@ -491,6 +492,236 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
+ use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
+ use arrow_buffer::NullBuffer;
+ use arrow_schema::{DataType, Field, Fields, Schema};
+ use std::sync::Arc;
+ let inner_fields = Fields::from(vec![
+ Field::new("id", DataType::Int32, false), // non-nullable
+ Field::new("name", DataType::Utf8, true), // nullable
+ ]);
+ let inner_struct_type = DataType::Struct(inner_fields.clone());
+ let schema = Schema::new(vec![
+ Field::new("before", inner_struct_type.clone(), true), // nullable
struct
+ Field::new("after", inner_struct_type.clone(), true), // nullable
struct
+ Field::new("op", DataType::Utf8, false), //
non-nullable
+ ]);
+ let before_ids = Int32Array::from(vec![None, None]);
+ let before_names = StringArray::from(vec![None::<&str>, None]);
+ let before_struct = StructArray::new(
+ inner_fields.clone(),
+ vec![
+ Arc::new(before_ids) as ArrayRef,
+ Arc::new(before_names) as ArrayRef,
+ ],
+ Some(NullBuffer::from(vec![false, false])),
+ );
+ let after_ids = Int32Array::from(vec![1, 2]); // non-nullable, no nulls
+ let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
+ let after_struct = StructArray::new(
+ inner_fields.clone(),
+ vec![
+ Arc::new(after_ids) as ArrayRef,
+ Arc::new(after_names) as ArrayRef,
+ ],
+ Some(NullBuffer::from(vec![true, true])),
+ );
+ let op_col = StringArray::from(vec!["r", "r"]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(before_struct) as ArrayRef,
+ Arc::new(after_struct) as ArrayRef,
+ Arc::new(op_col) as ArrayRef,
+ ],
+ )
+ .expect("failed to create test batch");
+ let mut sink = Vec::new();
+ let mut writer = WriterBuilder::new(schema)
+ .with_fingerprint_strategy(FingerprintStrategy::Id(1))
+ .build::<_, AvroSoeFormat>(&mut sink)
+ .expect("failed to create writer");
+ for row_idx in 0..batch.num_rows() {
+ let single_row = batch.slice(row_idx, 1);
+ let after_col = single_row.column(1);
+ assert_eq!(
+ after_col.null_count(),
+ 0,
+ "after column should have no nulls in sliced row"
+ );
+ writer
+ .write(&single_row)
+ .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}:
{e}"));
+ }
+ writer.finish().expect("failed to finish writer");
+ assert!(!sink.is_empty(), "encoded output should not be empty");
+ }
+
+ #[test]
+ fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
+ use arrow_array::{
+ ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
+ TimestampMicrosecondArray,
+ };
+ use arrow_buffer::NullBuffer;
+ use arrow_schema::{DataType, Field, Fields, Schema};
+ use std::sync::Arc;
+ let row_fields = Fields::from(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("category", DataType::Utf8, true),
+ Field::new("price", DataType::Decimal128(10, 2), true),
+ Field::new("stock_quantity", DataType::Int32, true),
+ Field::new(
+ "created_at",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ ]);
+ let row_struct_type = DataType::Struct(row_fields.clone());
+ let schema = Schema::new(vec![
+ Field::new("before", row_struct_type.clone(), true),
+ Field::new("after", row_struct_type.clone(), true),
+ Field::new("op", DataType::Utf8, false),
+ ]);
+ let before_struct = StructArray::new_null(row_fields.clone(), 2);
+ let ids = Int32Array::from(vec![1, 2]);
+ let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
+ let categories = StringArray::from(vec![Some("Electronics"),
Some("Electronics")]);
+ let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
+ .with_precision_and_scale(10, 2)
+ .unwrap();
+ let quantities = Int32Array::from(vec![Some(100), Some(50)]);
+ let timestamps = TimestampMicrosecondArray::from(vec![
+ Some(1700000000000000i64),
+ Some(1700000001000000i64),
+ ]);
+ let after_struct = StructArray::new(
+ row_fields.clone(),
+ vec![
+ Arc::new(ids) as ArrayRef,
+ Arc::new(names) as ArrayRef,
+ Arc::new(categories) as ArrayRef,
+ Arc::new(prices) as ArrayRef,
+ Arc::new(quantities) as ArrayRef,
+ Arc::new(timestamps) as ArrayRef,
+ ],
+ Some(NullBuffer::from(vec![true, true])),
+ );
+ let op_col = StringArray::from(vec!["r", "r"]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(before_struct) as ArrayRef,
+ Arc::new(after_struct) as ArrayRef,
+ Arc::new(op_col) as ArrayRef,
+ ],
+ )
+ .expect("failed to create products batch");
+ let mut sink = Vec::new();
+ let mut writer = WriterBuilder::new(schema)
+ .with_fingerprint_strategy(FingerprintStrategy::Id(1))
+ .build::<_, AvroSoeFormat>(&mut sink)
+ .expect("failed to create writer");
+ // Encode row by row
+ for row_idx in 0..batch.num_rows() {
+ let single_row = batch.slice(row_idx, 1);
+ writer
+ .write(&single_row)
+ .unwrap_or_else(|e| panic!("Failed to encode product row
{row_idx}: {e}"));
+ }
+ writer.finish().expect("failed to finish writer");
+ assert!(!sink.is_empty());
+ }
+
+ #[test]
+ fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
+ use arrow_array::{
+ ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray,
StructArray,
+ };
+ use arrow_schema::{DataType, Field, Fields, Schema};
+ use std::sync::Arc;
+ let row_fields = Fields::from(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]);
+ let row_struct_dt = DataType::Struct(row_fields.clone());
+ let before: ArrayRef =
Arc::new(StructArray::new_null(row_fields.clone(), 1));
+ let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
+ let name_col: ArrayRef =
Arc::new(StringArray::from(vec![None::<&str>]));
+ let after: ArrayRef = Arc::new(StructArray::new(
+ row_fields.clone(),
+ vec![id_col, name_col],
+ None,
+ ));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("before", row_struct_dt.clone(), true),
+ Field::new("after", row_struct_dt, true),
+ Field::new("op", DataType::Utf8, false),
+ Field::new("ts_ms", DataType::Int64, false),
+ ]));
+ let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
+ let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as
ArrayRef;
+ let batch = RecordBatch::try_new(schema.clone(), vec![before, after,
op, ts_ms]).unwrap();
+ let mut buf = Vec::new();
+ let mut writer = WriterBuilder::new(schema.as_ref().clone())
+ .build::<_, AvroSoeFormat>(&mut buf)
+ .unwrap();
+ let single = batch.slice(0, 1);
+ let res = writer.write(&single);
+ assert!(
+ res.is_ok(),
+ "expected to encode successfully, got: {:?}",
+ res.err()
+ );
+ }
+
+ #[test]
+ fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
+ use arrow_array::UnionArray;
+ use arrow_buffer::Buffer;
+ use arrow_schema::UnionFields;
+ let union_fields = UnionFields::new(
+ vec![2, 5],
+ vec![
+ Field::new("v_str", DataType::Utf8, true),
+ Field::new("v_int", DataType::Int32, true),
+ ],
+ );
+ let strings = StringArray::from(vec!["hello", "world"]);
+ let ints = Int32Array::from(vec![10, 20, 30]);
+ let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
+ let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
+ let union_array = UnionArray::try_new(
+ union_fields.clone(),
+ type_ids.into(),
+ Some(offsets.into()),
+ vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
+ )?;
+ let schema = Schema::new(vec![Field::new(
+ "union_col",
+ DataType::Union(union_fields, UnionMode::Dense),
+ false,
+ )]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(union_array) as ArrayRef],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ assert!(
+ writer.write(&batch).is_ok(),
+ "Expected no error from writing"
+ );
+ writer.finish()?;
+ assert!(
+ writer.finish().is_ok(),
+ "Expected no error from finishing writer"
+ );
+ Ok(())
+ }
+
#[test]
fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);