This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6123956485 Add Decimal type support to arrow-avro (#7832)
6123956485 is described below
commit 6123956485d86e5d306589d272ef5858b06c31d4
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Jul 1 14:10:29 2025 -0500
Add Decimal type support to arrow-avro (#7832)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
- Related to https://github.com/apache/arrow-rs/pull/6965
# Rationale for this change
This PR addresses a feature gap by introducing support for the Avro
`decimal` logical type, which is currently unimplemented as indicated by
the `test_decimal_logical_type_not_implemented` test case. The `decimal`
type is crucial for handling precise numerical data common in financial
and scientific applications, making this a necessary addition for
broader Avro compatibility.
# What changes are included in this PR?
This PR introduces the necessary changes to both parse and decode the
Avro `decimal` logical type into the corresponding Arrow `Decimal128` or
`Decimal256` data types.
The main changes are:
1. **Schema Parsing (`codec.rs`):**
* Implemented the logic within `make_data_type` to correctly parse the
`decimal` logical type from the Avro schema.
* The `Codec` enum's `Decimal` variant now correctly stores the
precision, scale, and optional fixed-size from the schema's attributes.
2. **Decoding Logic (`record.rs`):**
* Added `Decoder::Decimal128` and `Decoder::Decimal256` variants to
handle decoding of decimal values from both `bytes` and `fixed` Avro
types.
* The implementation correctly handles sign extension for negative
numbers to ensure accurate representation in Arrow's decimal arrays.
# Are these changes tested?
This PR includes comprehensive tests to validate the new functionality:
* The existing `test_decimal_logical_type_not_implemented` test has been
replaced with concrete test cases.
* Added unit tests in `record.rs` (`test_decimal_decoding_fixed256`,
`test_decimal_decoding_fixed128`,
`test_decimal_decoding_bytes_with_nulls`, etc.) to cover various
scenarios, including:
* Decoding from Avro `fixed` and `bytes` primitive types.
* Handling different precisions to select between `Decimal128` and
`Decimal256`.
* Correctly processing null values within decimal arrays.
# Are there any user-facing changes?
N/A
---
arrow-avro/Cargo.toml | 2 +-
arrow-avro/benches/avro_reader.rs | 12 +-
arrow-avro/src/codec.rs | 80 +++++++++++--
arrow-avro/src/reader/record.rs | 240 ++++++++++++++++++++++++++++++++++++--
4 files changed, 312 insertions(+), 22 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 24297f4a7e..c60413c593 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -53,7 +53,7 @@ crc = { version = "3.0", optional = true }
[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std",
"std_rng", "thread_rng"] }
-criterion = { version = "0.5", default-features = false }
+criterion = { version = "0.6.0", default-features = false }
tempfile = "3.3"
arrow = { workspace = true }
diff --git a/arrow-avro/benches/avro_reader.rs
b/arrow-avro/benches/avro_reader.rs
index 7b1a5afff8..bea69b1491 100644
--- a/arrow-avro/benches/avro_reader.rs
+++ b/arrow-avro/benches/avro_reader.rs
@@ -163,7 +163,7 @@ fn bench_array_creation(c: &mut Criterion) {
)
.unwrap();
- criterion::black_box(batch)
+ std::hint::black_box(batch)
})
});
@@ -187,7 +187,7 @@ fn bench_array_creation(c: &mut Criterion) {
)
.unwrap();
- criterion::black_box(batch)
+ std::hint::black_box(batch)
})
});
}
@@ -214,7 +214,7 @@ fn bench_string_operations(c: &mut Criterion) {
for i in 0..rows {
sum_len += string_array.value(i).len();
}
- criterion::black_box(sum_len)
+ std::hint::black_box(sum_len)
})
});
@@ -224,7 +224,7 @@ fn bench_string_operations(c: &mut Criterion) {
for i in 0..rows {
sum_len += string_view_array.value(i).len();
}
- criterion::black_box(sum_len)
+ std::hint::black_box(sum_len)
})
});
}
@@ -246,7 +246,7 @@ fn bench_avro_reader(c: &mut Criterion) {
b.iter(|| {
let options = ReadOptions::default();
let batch = read_avro_test_file(file_path, &options).unwrap();
- criterion::black_box(batch)
+ std::hint::black_box(batch)
})
});
@@ -254,7 +254,7 @@ fn bench_avro_reader(c: &mut Criterion) {
b.iter(|| {
let options = ReadOptions::default().with_utf8view(true);
let batch = read_avro_test_file(file_path, &options).unwrap();
- criterion::black_box(batch)
+ std::hint::black_box(batch)
})
});
}
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index caac390f3d..0f9fe9e6cd 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -16,8 +16,10 @@
// under the License.
use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema,
TypeName};
+use arrow_schema::DataType::{Decimal128, Decimal256};
use arrow_schema::{
- ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit,
SchemaBuilder, SchemaRef, TimeUnit,
+ ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit,
SchemaBuilder, SchemaRef,
+ TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
};
use std::borrow::Cow;
use std::collections::HashMap;
@@ -192,6 +194,13 @@ pub enum Codec {
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
+ /// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256
data types
+ ///
+ /// The fields are `(precision, scale, fixed_size)`.
+ /// - `precision` (`usize`): Total number of digits.
+ /// - `scale` (`Option<usize>`): Number of fractional digits.
+ /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed`
type, otherwise `None`.
+ Decimal(usize, Option<usize>, Option<usize>),
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
Uuid,
/// Represents Avro array type, maps to Arrow's List data type
@@ -227,6 +236,22 @@ impl Codec {
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
+ Self::Decimal(precision, scale, size) => {
+ let p = *precision as u8;
+ let s = scale.unwrap_or(0) as i8;
+ let too_large_for_128 = match *size {
+ Some(sz) => sz > 16,
+ None => {
+ (p as usize) > DECIMAL128_MAX_PRECISION as usize
+ || (s as usize) > DECIMAL128_MAX_SCALE as usize
+ }
+ };
+ if too_large_for_128 {
+ Decimal256(p, s)
+ } else {
+ Decimal128(p, s)
+ }
+ }
Self::Uuid => DataType::FixedSizeBinary(16),
Self::List(f) => {
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
@@ -267,6 +292,32 @@ impl From<PrimitiveType> for Codec {
}
}
+fn parse_decimal_attributes(
+ attributes: &Attributes,
+ fallback_size: Option<usize>,
+ precision_required: bool,
+) -> Result<(usize, usize, Option<usize>), ArrowError> {
+ let precision = attributes
+ .additional
+ .get("precision")
+ .and_then(|v| v.as_u64())
+ .or(if precision_required { None } else { Some(10) })
+ .ok_or_else(|| ArrowError::ParseError("Decimal requires
precision".to_string()))?
+ as usize;
+ let scale = attributes
+ .additional
+ .get("scale")
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0) as usize;
+ let size = attributes
+ .additional
+ .get("size")
+ .and_then(|v| v.as_u64())
+ .map(|s| s as usize)
+ .or(fallback_size);
+ Ok((precision, scale, size))
+}
+
impl Codec {
/// Converts a string codec to use Utf8View if requested
///
@@ -412,7 +463,6 @@ fn make_data_type<'a>(
let size = f.size.try_into().map_err(|e| {
ArrowError::ParseError(format!("Overflow converting size
to i32: {e}"))
})?;
-
let field = AvroDataType {
nullability: None,
metadata: f.attributes.field_metadata(),
@@ -443,11 +493,27 @@ fn make_data_type<'a>(
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
match (t.attributes.logical_type, &mut field.codec) {
- (Some("decimal"), c @ Codec::Fixed(_)) => {
- return Err(ArrowError::NotYetImplemented(
- "Decimals are not currently supported".to_string(),
- ))
- }
+ (Some("decimal"), c) => match *c {
+ Codec::Fixed(sz_val) => {
+ let (prec, sc, size_opt) =
+ parse_decimal_attributes(&t.attributes,
Some(sz_val as usize), true)?;
+ let final_sz = if let Some(sz_actual) = size_opt {
+ sz_actual
+ } else {
+ sz_val as usize
+ };
+ *c = Codec::Decimal(prec, Some(sc), Some(final_sz));
+ }
+ Codec::Binary => {
+ let (prec, sc, _) =
parse_decimal_attributes(&t.attributes, None, false)?;
+ *c = Codec::Decimal(prec, Some(sc), None);
+ }
+ _ => {
+ return Err(ArrowError::SchemaError(format!(
+ "Decimal logical type can only be backed by Fixed
or Bytes, found {c:?}"
+ )))
+ }
+ },
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
(Some("time-millis"), c @ Codec::Int32) => *c =
Codec::TimeMillis,
(Some("time-micros"), c @ Codec::Int64) => *c =
Codec::TimeMicros,
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 6d1a9f751a..e542e458f0 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -21,17 +21,21 @@ use crate::reader::cursor::AvroCursor;
use crate::reader::header::Header;
use crate::reader::ReadOptions;
use crate::schema::*;
+use arrow_array::builder::{Decimal128Builder, Decimal256Builder};
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::*;
use arrow_schema::{
ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as
ArrowSchema, SchemaRef,
+ DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::io::Read;
use std::sync::Arc;
+const DEFAULT_CAPACITY: usize = 1024;
+
/// Decodes avro encoded data into [`RecordBatch`]
pub struct RecordDecoder {
schema: SchemaRef,
@@ -123,6 +127,8 @@ enum Decoder {
Box<Decoder>,
),
Fixed(i32, Vec<u8>),
+ Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
+ Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
}
@@ -159,6 +165,45 @@ impl Decoder {
Self::TimestampMicros(*is_utc,
Vec::with_capacity(DEFAULT_CAPACITY))
}
Codec::Fixed(sz) => Self::Fixed(*sz,
Vec::with_capacity(DEFAULT_CAPACITY)),
+ Codec::Decimal(precision, scale, size) => {
+ let p = *precision;
+ let s = *scale;
+ let sz = *size;
+ let prec = p as u8;
+ let scl = s.unwrap_or(0) as i8;
+ match (sz, p) {
+ (Some(fixed_size), _) if fixed_size <= 16 => {
+ let builder =
+
Decimal128Builder::new().with_precision_and_scale(prec, scl)?;
+ return Ok(Self::Decimal128(p, s, sz, builder));
+ }
+ (Some(fixed_size), _) if fixed_size <= 32 => {
+ let builder =
+
Decimal256Builder::new().with_precision_and_scale(prec, scl)?;
+ return Ok(Self::Decimal256(p, s, sz, builder));
+ }
+ (Some(fixed_size), _) => {
+ return Err(ArrowError::ParseError(format!(
+ "Unsupported decimal size: {fixed_size:?}"
+ )));
+ }
+ (None, p) if p <= DECIMAL128_MAX_PRECISION as usize => {
+ let builder =
+
Decimal128Builder::new().with_precision_and_scale(prec, scl)?;
+ Self::Decimal128(p, s, sz, builder)
+ }
+ (None, p) if p <= DECIMAL256_MAX_PRECISION as usize => {
+ let builder =
+
Decimal256Builder::new().with_precision_and_scale(prec, scl)?;
+ Self::Decimal256(p, s, sz, builder)
+ }
+ (None, _) => {
+ return Err(ArrowError::ParseError(format!(
+ "Decimal precision {p} exceeds maximum supported"
+ )));
+ }
+ }
+ }
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
let decoder = Self::try_new(item)?;
@@ -199,7 +244,6 @@ impl Decoder {
}
Codec::Uuid => Self::Fixed(16,
Vec::with_capacity(DEFAULT_CAPACITY)),
};
-
Ok(match data_type.nullability() {
Some(nullability) => Self::Nullable(
nullability,
@@ -233,10 +277,12 @@ impl Decoder {
Self::Map(_, _koff, moff, _, _) => {
moff.push_length(0);
}
- Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
Self::Fixed(sz, accum) => {
accum.extend(std::iter::repeat(0u8).take(*sz as usize));
}
+ Self::Decimal128(_, _, _, builder) => builder.append_value(0),
+ Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
+ Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
}
}
@@ -279,6 +325,30 @@ impl Decoder {
})?;
moff.push_length(newly_added);
}
+ Self::Fixed(sz, accum) => {
+ let fx = buf.get_fixed(*sz as usize)?;
+ accum.extend_from_slice(fx);
+ }
+ Self::Decimal128(_, _, size, builder) => {
+ let raw = if let Some(s) = size {
+ buf.get_fixed(*s)?
+ } else {
+ buf.get_bytes()?
+ };
+ let ext = sign_extend_to::<16>(raw)?;
+ let val = i128::from_be_bytes(ext);
+ builder.append_value(val);
+ }
+ Self::Decimal256(_, _, size, builder) => {
+ let raw = if let Some(s) = size {
+ buf.get_fixed(*s)?
+ } else {
+ buf.get_bytes()?
+ };
+ let ext = sign_extend_to::<32>(raw)?;
+ let val = i256::from_be_bytes(ext);
+ builder.append_value(val);
+ }
Self::Nullable(nullability, nulls, e) => {
let is_valid = buf.get_bool()? == matches!(nullability,
Nullability::NullFirst);
nulls.append(is_valid);
@@ -287,10 +357,6 @@ impl Decoder {
false => e.append_null(),
}
}
- Self::Fixed(sz, accum) => {
- let fx = buf.get_fixed(*sz as usize)?;
- accum.extend_from_slice(fx);
- }
}
Ok(())
}
@@ -334,7 +400,6 @@ impl Decoder {
let offsets = flush_offsets(offsets);
let values = flush_values(values);
let array = StringArray::new(offsets, values.into(),
nulls.clone());
-
let values: Vec<&str> = (0..array.len())
.map(|i| {
if array.is_valid(i) {
@@ -398,6 +463,24 @@ impl Decoder {
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
Arc::new(arr)
}
+ Self::Decimal128(precision, scale, _, builder) => {
+ let mut b = std::mem::take(builder);
+ let (_, vals, _) = b.finish().into_parts();
+ let scl = scale.unwrap_or(0);
+ let dec = Decimal128Array::new(vals, nulls)
+ .with_precision_and_scale(*precision as u8, scl as i8)
+ .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ Arc::new(dec)
+ }
+ Self::Decimal256(precision, scale, _, builder) => {
+ let mut b = std::mem::take(builder);
+ let (_, vals, _) = b.finish().into_parts();
+ let scl = scale.unwrap_or(0);
+ let dec = Decimal256Array::new(vals, nulls)
+ .with_precision_and_scale(*precision as u8, scl as i8)
+ .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ Arc::new(dec)
+ }
})
}
}
@@ -466,7 +549,30 @@ fn flush_primitive<T: ArrowPrimitiveType>(
PrimitiveArray::new(flush_values(values).into(), nulls)
}
-const DEFAULT_CAPACITY: usize = 1024;
+/// Sign extends a byte slice to a fixed-size array of N bytes.
+/// This is done by filling the leading bytes with 0x00 for positive numbers
+/// or 0xFF for negative numbers.
+#[inline]
+fn sign_extend_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
+ if raw.len() > N {
+ return Err(ArrowError::ParseError(format!(
+ "Cannot extend a slice of length {} to {} bytes.",
+ raw.len(),
+ N
+ )));
+ }
+ let mut arr = [0u8; N];
+ let pad_len = N - raw.len();
+ // Determine the byte to use for padding based on the sign bit of the raw
data.
+ let extension_byte = if raw.is_empty() || (raw[0] & 0x80 == 0) {
+ 0x00
+ } else {
+ 0xFF
+ };
+ arr[..pad_len].fill(extension_byte);
+ arr[pad_len..].copy_from_slice(raw);
+ Ok(arr)
+}
#[cfg(test)]
mod tests {
@@ -732,4 +838,122 @@ mod tests {
assert_eq!(list_arr.len(), 1);
assert_eq!(list_arr.value_length(0), 0);
}
+
+ #[test]
+ fn test_decimal_decoding_fixed256() {
+ let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
+ let mut decoder = Decoder::try_new(&dt).unwrap();
+ let row1 = [
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x30, 0x39,
+ ];
+ let row2 = [
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0x85,
+ ];
+ let mut data = Vec::new();
+ data.extend_from_slice(&row1);
+ data.extend_from_slice(&row2);
+ let mut cursor = AvroCursor::new(&data);
+ decoder.decode(&mut cursor).unwrap();
+ decoder.decode(&mut cursor).unwrap();
+ let arr = decoder.flush(None).unwrap();
+ let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
+ assert_eq!(dec.len(), 2);
+ assert_eq!(dec.value_as_string(0), "123.45");
+ assert_eq!(dec.value_as_string(1), "-1.23");
+ }
+
+ #[test]
+ fn test_decimal_decoding_fixed128() {
+ let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
+ let mut decoder = Decoder::try_new(&dt).unwrap();
+ let row1 = [
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ 0x30, 0x39,
+ ];
+ let row2 = [
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF,
+ 0xFF, 0x85,
+ ];
+ let mut data = Vec::new();
+ data.extend_from_slice(&row1);
+ data.extend_from_slice(&row2);
+ let mut cursor = AvroCursor::new(&data);
+ decoder.decode(&mut cursor).unwrap();
+ decoder.decode(&mut cursor).unwrap();
+ let arr = decoder.flush(None).unwrap();
+ let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ assert_eq!(dec.len(), 2);
+ assert_eq!(dec.value_as_string(0), "123.45");
+ assert_eq!(dec.value_as_string(1), "-1.23");
+ }
+
+ #[test]
+ fn test_decimal_decoding_bytes_with_nulls() {
+ let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
+ let inner = Decoder::try_new(&dt).unwrap();
+ let mut decoder = Decoder::Nullable(
+ Nullability::NullSecond,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(inner),
+ );
+ let mut data = Vec::new();
+ data.extend_from_slice(&encode_avro_int(0));
+ data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
+ data.extend_from_slice(&encode_avro_int(1));
+ data.extend_from_slice(&encode_avro_int(0));
+ data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
+ let mut cursor = AvroCursor::new(&data);
+ decoder.decode(&mut cursor).unwrap(); // row1
+ decoder.decode(&mut cursor).unwrap(); // row2
+ decoder.decode(&mut cursor).unwrap(); // row3
+ let arr = decoder.flush(None).unwrap();
+ let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ assert_eq!(dec_arr.len(), 3);
+ assert!(dec_arr.is_valid(0));
+ assert!(!dec_arr.is_valid(1));
+ assert!(dec_arr.is_valid(2));
+ assert_eq!(dec_arr.value_as_string(0), "123.4");
+ assert_eq!(dec_arr.value_as_string(2), "-123.4");
+ }
+
+ #[test]
+ fn test_decimal_decoding_bytes_with_nulls_fixed_size() {
+ let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
+ let inner = Decoder::try_new(&dt).unwrap();
+ let mut decoder = Decoder::Nullable(
+ Nullability::NullSecond,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(inner),
+ );
+ let row1 = [
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01,
+ 0xE2, 0x40,
+ ];
+ let row3 = [
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFE,
+ 0x1D, 0xC0,
+ ];
+ let mut data = Vec::new();
+ data.extend_from_slice(&encode_avro_int(0));
+ data.extend_from_slice(&row1);
+ data.extend_from_slice(&encode_avro_int(1));
+ data.extend_from_slice(&encode_avro_int(0));
+ data.extend_from_slice(&row3);
+ let mut cursor = AvroCursor::new(&data);
+ decoder.decode(&mut cursor).unwrap();
+ decoder.decode(&mut cursor).unwrap();
+ decoder.decode(&mut cursor).unwrap();
+ let arr = decoder.flush(None).unwrap();
+ let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ assert_eq!(dec_arr.len(), 3);
+ assert!(dec_arr.is_valid(0));
+ assert!(!dec_arr.is_valid(1));
+ assert!(dec_arr.is_valid(2));
+ assert_eq!(dec_arr.value_as_string(0), "1234.56");
+ assert_eq!(dec_arr.value_as_string(2), "-1234.56");
+ }
}