This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6123956485 Add Decimal type support to arrow-avro  (#7832)
6123956485 is described below

commit 6123956485d86e5d306589d272ef5858b06c31d4
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Jul 1 14:10:29 2025 -0500

    Add Decimal type support to arrow-avro  (#7832)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    
    - Related to https://github.com/apache/arrow-rs/pull/6965
    
    # Rationale for this change
    
    This PR addresses a feature gap by introducing support for the Avro
    `decimal` logical type, which is currently unimplemented as indicated by
    the `test_decimal_logical_type_not_implemented` test case. The `decimal`
    type is crucial for handling precise numerical data common in financial
    and scientific applications, making this a necessary addition for
    broader Avro compatibility.
    
    # What changes are included in this PR?
    
    This PR introduces the necessary changes to both parse and decode the
    Avro `decimal` logical type into the corresponding Arrow `Decimal128` or
    `Decimal256` data types.
    
    The main changes are:
    1.  **Schema Parsing (`codec.rs`):**
    * Implemented the logic within `make_data_type` to correctly parse the
    `decimal` logical type from the Avro schema.
    * The `Codec` enum's `Decimal` variant now correctly stores the
    precision, scale, and optional fixed-size from the schema's attributes.
    
    2.  **Decoding Logic (`record.rs`):**
    * Added `Decoder::Decimal128` and `Decoder::Decimal256` variants to
    handle decoding of decimal values from both `bytes` and `fixed` Avro
    types.
    * The implementation correctly handles sign extension for negative
    numbers to ensure accurate representation in Arrow's decimal arrays.
    
    # Are these changes tested?
    
    This PR includes comprehensive tests to validate the new functionality:
    * The existing `test_decimal_logical_type_not_implemented` test has been
    replaced with concrete test cases.
    * Added unit tests in `record.rs` (`test_decimal_decoding_fixed256`,
    `test_decimal_decoding_fixed128`,
    `test_decimal_decoding_bytes_with_nulls`, etc.) to cover various
    scenarios, including:
        *   Decoding from Avro `fixed` and `bytes` primitive types.
    * Handling different precisions to select between `Decimal128` and
    `Decimal256`.
        *   Correctly processing null values within decimal arrays.
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/Cargo.toml             |   2 +-
 arrow-avro/benches/avro_reader.rs |  12 +-
 arrow-avro/src/codec.rs           |  80 +++++++++++--
 arrow-avro/src/reader/record.rs   | 240 ++++++++++++++++++++++++++++++++++++--
 4 files changed, 312 insertions(+), 22 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 24297f4a7e..c60413c593 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -53,7 +53,7 @@ crc = { version = "3.0", optional = true }
 
 [dev-dependencies]
 rand = { version = "0.9", default-features = false, features = ["std", 
"std_rng", "thread_rng"] }
-criterion = { version = "0.5", default-features = false }
+criterion = { version = "0.6.0", default-features = false }
 tempfile = "3.3"
 arrow = { workspace = true }
 
diff --git a/arrow-avro/benches/avro_reader.rs 
b/arrow-avro/benches/avro_reader.rs
index 7b1a5afff8..bea69b1491 100644
--- a/arrow-avro/benches/avro_reader.rs
+++ b/arrow-avro/benches/avro_reader.rs
@@ -163,7 +163,7 @@ fn bench_array_creation(c: &mut Criterion) {
                 )
                 .unwrap();
 
-                criterion::black_box(batch)
+                std::hint::black_box(batch)
             })
         });
 
@@ -187,7 +187,7 @@ fn bench_array_creation(c: &mut Criterion) {
                 )
                 .unwrap();
 
-                criterion::black_box(batch)
+                std::hint::black_box(batch)
             })
         });
     }
@@ -214,7 +214,7 @@ fn bench_string_operations(c: &mut Criterion) {
                 for i in 0..rows {
                     sum_len += string_array.value(i).len();
                 }
-                criterion::black_box(sum_len)
+                std::hint::black_box(sum_len)
             })
         });
 
@@ -224,7 +224,7 @@ fn bench_string_operations(c: &mut Criterion) {
                 for i in 0..rows {
                     sum_len += string_view_array.value(i).len();
                 }
-                criterion::black_box(sum_len)
+                std::hint::black_box(sum_len)
             })
         });
     }
@@ -246,7 +246,7 @@ fn bench_avro_reader(c: &mut Criterion) {
             b.iter(|| {
                 let options = ReadOptions::default();
                 let batch = read_avro_test_file(file_path, &options).unwrap();
-                criterion::black_box(batch)
+                std::hint::black_box(batch)
             })
         });
 
@@ -254,7 +254,7 @@ fn bench_avro_reader(c: &mut Criterion) {
             b.iter(|| {
                 let options = ReadOptions::default().with_utf8view(true);
                 let batch = read_avro_test_file(file_path, &options).unwrap();
-                criterion::black_box(batch)
+                std::hint::black_box(batch)
             })
         });
     }
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index caac390f3d..0f9fe9e6cd 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -16,8 +16,10 @@
 // under the License.
 
 use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, 
TypeName};
+use arrow_schema::DataType::{Decimal128, Decimal256};
 use arrow_schema::{
-    ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, 
SchemaBuilder, SchemaRef, TimeUnit,
+    ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, 
SchemaBuilder, SchemaRef,
+    TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
 };
 use std::borrow::Cow;
 use std::collections::HashMap;
@@ -192,6 +194,13 @@ pub enum Codec {
     /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
     /// The i32 parameter indicates the fixed binary size
     Fixed(i32),
+    /// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 
data types
+    ///
+    /// The fields are `(precision, scale, fixed_size)`.
+    /// - `precision` (`usize`): Total number of digits.
+    /// - `scale` (`Option<usize>`): Number of fractional digits.
+    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` 
type, otherwise `None`.
+    Decimal(usize, Option<usize>, Option<usize>),
     /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
     Uuid,
     /// Represents Avro array type, maps to Arrow's List data type
@@ -227,6 +236,22 @@ impl Codec {
             }
             Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
             Self::Fixed(size) => DataType::FixedSizeBinary(*size),
+            Self::Decimal(precision, scale, size) => {
+                let p = *precision as u8;
+                let s = scale.unwrap_or(0) as i8;
+                let too_large_for_128 = match *size {
+                    Some(sz) => sz > 16,
+                    None => {
+                        (p as usize) > DECIMAL128_MAX_PRECISION as usize
+                            || (s as usize) > DECIMAL128_MAX_SCALE as usize
+                    }
+                };
+                if too_large_for_128 {
+                    Decimal256(p, s)
+                } else {
+                    Decimal128(p, s)
+                }
+            }
             Self::Uuid => DataType::FixedSizeBinary(16),
             Self::List(f) => {
                 
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
@@ -267,6 +292,32 @@ impl From<PrimitiveType> for Codec {
     }
 }
 
+fn parse_decimal_attributes(
+    attributes: &Attributes,
+    fallback_size: Option<usize>,
+    precision_required: bool,
+) -> Result<(usize, usize, Option<usize>), ArrowError> {
+    let precision = attributes
+        .additional
+        .get("precision")
+        .and_then(|v| v.as_u64())
+        .or(if precision_required { None } else { Some(10) })
+        .ok_or_else(|| ArrowError::ParseError("Decimal requires 
precision".to_string()))?
+        as usize;
+    let scale = attributes
+        .additional
+        .get("scale")
+        .and_then(|v| v.as_u64())
+        .unwrap_or(0) as usize;
+    let size = attributes
+        .additional
+        .get("size")
+        .and_then(|v| v.as_u64())
+        .map(|s| s as usize)
+        .or(fallback_size);
+    Ok((precision, scale, size))
+}
+
 impl Codec {
     /// Converts a string codec to use Utf8View if requested
     ///
@@ -412,7 +463,6 @@ fn make_data_type<'a>(
                 let size = f.size.try_into().map_err(|e| {
                     ArrowError::ParseError(format!("Overflow converting size 
to i32: {e}"))
                 })?;
-
                 let field = AvroDataType {
                     nullability: None,
                     metadata: f.attributes.field_metadata(),
@@ -443,11 +493,27 @@ fn make_data_type<'a>(
 
             // https://avro.apache.org/docs/1.11.1/specification/#logical-types
             match (t.attributes.logical_type, &mut field.codec) {
-                (Some("decimal"), c @ Codec::Fixed(_)) => {
-                    return Err(ArrowError::NotYetImplemented(
-                        "Decimals are not currently supported".to_string(),
-                    ))
-                }
+                (Some("decimal"), c) => match *c {
+                    Codec::Fixed(sz_val) => {
+                        let (prec, sc, size_opt) =
+                            parse_decimal_attributes(&t.attributes, 
Some(sz_val as usize), true)?;
+                        let final_sz = if let Some(sz_actual) = size_opt {
+                            sz_actual
+                        } else {
+                            sz_val as usize
+                        };
+                        *c = Codec::Decimal(prec, Some(sc), Some(final_sz));
+                    }
+                    Codec::Binary => {
+                        let (prec, sc, _) = 
parse_decimal_attributes(&t.attributes, None, false)?;
+                        *c = Codec::Decimal(prec, Some(sc), None);
+                    }
+                    _ => {
+                        return Err(ArrowError::SchemaError(format!(
+                            "Decimal logical type can only be backed by Fixed 
or Bytes, found {c:?}"
+                        )))
+                    }
+                },
                 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
                 (Some("time-millis"), c @ Codec::Int32) => *c = 
Codec::TimeMillis,
                 (Some("time-micros"), c @ Codec::Int64) => *c = 
Codec::TimeMicros,
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 6d1a9f751a..e542e458f0 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -21,17 +21,21 @@ use crate::reader::cursor::AvroCursor;
 use crate::reader::header::Header;
 use crate::reader::ReadOptions;
 use crate::schema::*;
+use arrow_array::builder::{Decimal128Builder, Decimal256Builder};
 use arrow_array::types::*;
 use arrow_array::*;
 use arrow_buffer::*;
 use arrow_schema::{
     ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as 
ArrowSchema, SchemaRef,
+    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
 };
 use std::cmp::Ordering;
 use std::collections::HashMap;
 use std::io::Read;
 use std::sync::Arc;
 
+const DEFAULT_CAPACITY: usize = 1024;
+
 /// Decodes avro encoded data into [`RecordBatch`]
 pub struct RecordDecoder {
     schema: SchemaRef,
@@ -123,6 +127,8 @@ enum Decoder {
         Box<Decoder>,
     ),
     Fixed(i32, Vec<u8>),
+    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
+    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
@@ -159,6 +165,45 @@ impl Decoder {
                 Self::TimestampMicros(*is_utc, 
Vec::with_capacity(DEFAULT_CAPACITY))
             }
             Codec::Fixed(sz) => Self::Fixed(*sz, 
Vec::with_capacity(DEFAULT_CAPACITY)),
+            Codec::Decimal(precision, scale, size) => {
+                let p = *precision;
+                let s = *scale;
+                let sz = *size;
+                let prec = p as u8;
+                let scl = s.unwrap_or(0) as i8;
+                match (sz, p) {
+                    (Some(fixed_size), _) if fixed_size <= 16 => {
+                        let builder =
+                            
Decimal128Builder::new().with_precision_and_scale(prec, scl)?;
+                        return Ok(Self::Decimal128(p, s, sz, builder));
+                    }
+                    (Some(fixed_size), _) if fixed_size <= 32 => {
+                        let builder =
+                            
Decimal256Builder::new().with_precision_and_scale(prec, scl)?;
+                        return Ok(Self::Decimal256(p, s, sz, builder));
+                    }
+                    (Some(fixed_size), _) => {
+                        return Err(ArrowError::ParseError(format!(
+                            "Unsupported decimal size: {fixed_size:?}"
+                        )));
+                    }
+                    (None, p) if p <= DECIMAL128_MAX_PRECISION as usize => {
+                        let builder =
+                            
Decimal128Builder::new().with_precision_and_scale(prec, scl)?;
+                        Self::Decimal128(p, s, sz, builder)
+                    }
+                    (None, p) if p <= DECIMAL256_MAX_PRECISION as usize => {
+                        let builder =
+                            
Decimal256Builder::new().with_precision_and_scale(prec, scl)?;
+                        Self::Decimal256(p, s, sz, builder)
+                    }
+                    (None, _) => {
+                        return Err(ArrowError::ParseError(format!(
+                            "Decimal precision {p} exceeds maximum supported"
+                        )));
+                    }
+                }
+            }
             Codec::Interval => return nyi("decoding interval"),
             Codec::List(item) => {
                 let decoder = Self::try_new(item)?;
@@ -199,7 +244,6 @@ impl Decoder {
             }
             Codec::Uuid => Self::Fixed(16, 
Vec::with_capacity(DEFAULT_CAPACITY)),
         };
-
         Ok(match data_type.nullability() {
             Some(nullability) => Self::Nullable(
                 nullability,
@@ -233,10 +277,12 @@ impl Decoder {
             Self::Map(_, _koff, moff, _, _) => {
                 moff.push_length(0);
             }
-            Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
             Self::Fixed(sz, accum) => {
                 accum.extend(std::iter::repeat(0u8).take(*sz as usize));
             }
+            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
+            Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
+            Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
         }
     }
 
@@ -279,6 +325,30 @@ impl Decoder {
                 })?;
                 moff.push_length(newly_added);
             }
+            Self::Fixed(sz, accum) => {
+                let fx = buf.get_fixed(*sz as usize)?;
+                accum.extend_from_slice(fx);
+            }
+            Self::Decimal128(_, _, size, builder) => {
+                let raw = if let Some(s) = size {
+                    buf.get_fixed(*s)?
+                } else {
+                    buf.get_bytes()?
+                };
+                let ext = sign_extend_to::<16>(raw)?;
+                let val = i128::from_be_bytes(ext);
+                builder.append_value(val);
+            }
+            Self::Decimal256(_, _, size, builder) => {
+                let raw = if let Some(s) = size {
+                    buf.get_fixed(*s)?
+                } else {
+                    buf.get_bytes()?
+                };
+                let ext = sign_extend_to::<32>(raw)?;
+                let val = i256::from_be_bytes(ext);
+                builder.append_value(val);
+            }
             Self::Nullable(nullability, nulls, e) => {
                 let is_valid = buf.get_bool()? == matches!(nullability, 
Nullability::NullFirst);
                 nulls.append(is_valid);
@@ -287,10 +357,6 @@ impl Decoder {
                     false => e.append_null(),
                 }
             }
-            Self::Fixed(sz, accum) => {
-                let fx = buf.get_fixed(*sz as usize)?;
-                accum.extend_from_slice(fx);
-            }
         }
         Ok(())
     }
@@ -334,7 +400,6 @@ impl Decoder {
                 let offsets = flush_offsets(offsets);
                 let values = flush_values(values);
                 let array = StringArray::new(offsets, values.into(), 
nulls.clone());
-
                 let values: Vec<&str> = (0..array.len())
                     .map(|i| {
                         if array.is_valid(i) {
@@ -398,6 +463,24 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(arr)
             }
+            Self::Decimal128(precision, scale, _, builder) => {
+                let mut b = std::mem::take(builder);
+                let (_, vals, _) = b.finish().into_parts();
+                let scl = scale.unwrap_or(0);
+                let dec = Decimal128Array::new(vals, nulls)
+                    .with_precision_and_scale(*precision as u8, scl as i8)
+                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+                Arc::new(dec)
+            }
+            Self::Decimal256(precision, scale, _, builder) => {
+                let mut b = std::mem::take(builder);
+                let (_, vals, _) = b.finish().into_parts();
+                let scl = scale.unwrap_or(0);
+                let dec = Decimal256Array::new(vals, nulls)
+                    .with_precision_and_scale(*precision as u8, scl as i8)
+                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+                Arc::new(dec)
+            }
         })
     }
 }
@@ -466,7 +549,30 @@ fn flush_primitive<T: ArrowPrimitiveType>(
     PrimitiveArray::new(flush_values(values).into(), nulls)
 }
 
-const DEFAULT_CAPACITY: usize = 1024;
+/// Sign extends a byte slice to a fixed-size array of N bytes.
+/// This is done by filling the leading bytes with 0x00 for positive numbers
+/// or 0xFF for negative numbers.
+#[inline]
+fn sign_extend_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
+    if raw.len() > N {
+        return Err(ArrowError::ParseError(format!(
+            "Cannot extend a slice of length {} to {} bytes.",
+            raw.len(),
+            N
+        )));
+    }
+    let mut arr = [0u8; N];
+    let pad_len = N - raw.len();
+    // Determine the byte to use for padding based on the sign bit of the raw 
data.
+    let extension_byte = if raw.is_empty() || (raw[0] & 0x80 == 0) {
+        0x00
+    } else {
+        0xFF
+    };
+    arr[..pad_len].fill(extension_byte);
+    arr[pad_len..].copy_from_slice(raw);
+    Ok(arr)
+}
 
 #[cfg(test)]
 mod tests {
@@ -732,4 +838,122 @@ mod tests {
         assert_eq!(list_arr.len(), 1);
         assert_eq!(list_arr.value_length(0), 0);
     }
+
+    #[test]
+    fn test_decimal_decoding_fixed256() {
+        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
+        let mut decoder = Decoder::try_new(&dt).unwrap();
+        let row1 = [
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+            0x00, 0x00, 0x30, 0x39,
+        ];
+        let row2 = [
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFF,
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFF,
+            0xFF, 0xFF, 0xFF, 0x85,
+        ];
+        let mut data = Vec::new();
+        data.extend_from_slice(&row1);
+        data.extend_from_slice(&row2);
+        let mut cursor = AvroCursor::new(&data);
+        decoder.decode(&mut cursor).unwrap();
+        decoder.decode(&mut cursor).unwrap();
+        let arr = decoder.flush(None).unwrap();
+        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
+        assert_eq!(dec.len(), 2);
+        assert_eq!(dec.value_as_string(0), "123.45");
+        assert_eq!(dec.value_as_string(1), "-1.23");
+    }
+
+    #[test]
+    fn test_decimal_decoding_fixed128() {
+        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
+        let mut decoder = Decoder::try_new(&dt).unwrap();
+        let row1 = [
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+            0x30, 0x39,
+        ];
+        let row2 = [
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFF,
+            0xFF, 0x85,
+        ];
+        let mut data = Vec::new();
+        data.extend_from_slice(&row1);
+        data.extend_from_slice(&row2);
+        let mut cursor = AvroCursor::new(&data);
+        decoder.decode(&mut cursor).unwrap();
+        decoder.decode(&mut cursor).unwrap();
+        let arr = decoder.flush(None).unwrap();
+        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+        assert_eq!(dec.len(), 2);
+        assert_eq!(dec.value_as_string(0), "123.45");
+        assert_eq!(dec.value_as_string(1), "-1.23");
+    }
+
+    #[test]
+    fn test_decimal_decoding_bytes_with_nulls() {
+        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
+        let inner = Decoder::try_new(&dt).unwrap();
+        let mut decoder = Decoder::Nullable(
+            Nullability::NullSecond,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(inner),
+        );
+        let mut data = Vec::new();
+        data.extend_from_slice(&encode_avro_int(0));
+        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
+        data.extend_from_slice(&encode_avro_int(1));
+        data.extend_from_slice(&encode_avro_int(0));
+        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
+        let mut cursor = AvroCursor::new(&data);
+        decoder.decode(&mut cursor).unwrap(); // row1
+        decoder.decode(&mut cursor).unwrap(); // row2
+        decoder.decode(&mut cursor).unwrap(); // row3
+        let arr = decoder.flush(None).unwrap();
+        let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+        assert_eq!(dec_arr.len(), 3);
+        assert!(dec_arr.is_valid(0));
+        assert!(!dec_arr.is_valid(1));
+        assert!(dec_arr.is_valid(2));
+        assert_eq!(dec_arr.value_as_string(0), "123.4");
+        assert_eq!(dec_arr.value_as_string(2), "-123.4");
+    }
+
+    #[test]
+    fn test_decimal_decoding_bytes_with_nulls_fixed_size() {
+        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
+        let inner = Decoder::try_new(&dt).unwrap();
+        let mut decoder = Decoder::Nullable(
+            Nullability::NullSecond,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(inner),
+        );
+        let row1 = [
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x01,
+            0xE2, 0x40,
+        ];
+        let row3 = [
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFE,
+            0x1D, 0xC0,
+        ];
+        let mut data = Vec::new();
+        data.extend_from_slice(&encode_avro_int(0));
+        data.extend_from_slice(&row1);
+        data.extend_from_slice(&encode_avro_int(1));
+        data.extend_from_slice(&encode_avro_int(0));
+        data.extend_from_slice(&row3);
+        let mut cursor = AvroCursor::new(&data);
+        decoder.decode(&mut cursor).unwrap();
+        decoder.decode(&mut cursor).unwrap();
+        decoder.decode(&mut cursor).unwrap();
+        let arr = decoder.flush(None).unwrap();
+        let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
+        assert_eq!(dec_arr.len(), 3);
+        assert!(dec_arr.is_valid(0));
+        assert!(!dec_arr.is_valid(1));
+        assert!(dec_arr.is_valid(2));
+        assert_eq!(dec_arr.value_as_string(0), "1234.56");
+        assert_eq!(dec_arr.value_as_string(2), "-1234.56");
+    }
 }

Reply via email to