This is an automated email from the ASF dual-hosted git repository.

mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 74c0386494 Add support for run-end encoded (REE) arrays in arrow-avro 
(#8584)
74c0386494 is described below

commit 74c03864948c5699b2a0e47010d6282bc636f373
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Oct 13 04:50:06 2025 -0500

    Add support for run-end encoded (REE) arrays in arrow-avro (#8584)
    
    # Which issue does this PR close?
    
    - Part of #4886
    
    # Rationale for this change
    
    Arrow has a first‑class **Run‑End Encoded (REE)** data type that
    efficiently represents consecutive repeated values by storing *run ends*
    (indices) alongside the *values* array. Adding REE support to
    `arrow-avro` lets users read/write Arrow REE arrays to Avro without
    inflating them, preserving size and performance characteristics across
    serialization boundaries.
    
    # What changes are included in this PR?
    
    - **New Avro codec for REE**: Introduces
    `Codec::RunEndEncoded(Arc<AvroDataType>, u8)` and maps it to Arrow’s
    `DataType::RunEndEncoded`. The run‑end index bit width must be one of
    16/32/64, and the generated Arrow fields use the standard child names
    `run_ends` (non‑nullable) and `values`.
    - **Schema parsing & validation**: Recognizes the Avro logical type
    `arrow.run-end-encoded` and *requires* the attribute
    `arrow.runEndIndexBits` (one of 16, 32, 64). Missing or invalid values
    yield clear parse errors.
    - **Nullability propagation**: When REE appears inside nullable unions,
    nullability is “bubbled” into the `values` branch so Avro JSON
    generation models nullability correctly.
    - **Union integration**: `From<&Codec> for UnionFieldKind` is updated so
    REE defers to the inner value codec, ensuring unions of REE types
    resolve as expected.
    - **Feature wiring / dependency**: Enables REE handling behind the
    existing `avro_custom_types` feature and adds an optional dependency on
    `arrow-select` (feature now includes `"arrow-select"`).
    - **Reader/Writer updates**: Enhances the encoder/reader paths to
    round‑trip REE arrays end‑to‑end.
    
    # Are these changes tested?
    
    Yes. This commit adds **end‑to‑end tests** that round‑trip REE arrays
    with run‑end index types **Int16**, **Int32**, and **Int64** through the
    Avro reader/writer to validate schema, encoding, and decoding.
    
    # Are there any user-facing changes?
    
    N/A since `arrow-avro` is not public yet.
    
    ---------
    
    Co-authored-by: Matthijs Brobbel <[email protected]>
---
 arrow-avro/Cargo.toml            |   3 +-
 arrow-avro/src/codec.rs          |  59 ++++++
 arrow-avro/src/reader/record.rs  | 350 +++++++++++++++++++++++++++++++++
 arrow-avro/src/schema.rs         |  50 ++++-
 arrow-avro/src/writer/encoder.rs | 200 ++++++++++++++++++-
 arrow-avro/src/writer/mod.rs     | 407 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 1050 insertions(+), 19 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 374cc896d5..dc59d337a0 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -43,12 +43,13 @@ canonical_extension_types = 
["arrow-schema/canonical_extension_types"]
 md5 = ["dep:md5"]
 sha256 = ["dep:sha2"]
 small_decimals = []
-avro_custom_types = []
+avro_custom_types = ["dep:arrow-select"]
 
 [dependencies]
 arrow-schema = { workspace = true }
 arrow-buffer = { workspace = true }
 arrow-array = { workspace = true }
+arrow-select = { workspace = true, optional = true }
 serde_json = { version = "1.0", default-features = false, features = ["std"] }
 serde = { version = "1.0.188", features = ["derive"] }
 flate2 = { version = "1.0", default-features = false, features = [
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index f946a5a1b9..a6a495bdf3 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -14,6 +14,9 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
+//! Codec for Mapping Avro and Arrow types.
+
 use crate::schema::{
     AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, 
AVRO_NAME_METADATA_KEY,
     AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, 
Map, Nullability,
@@ -460,6 +463,8 @@ impl AvroDataType {
                 };
                 default_encoding.parse_default_literal(default_json)?
             }
+            #[cfg(feature = "avro_custom_types")]
+            Codec::RunEndEncoded(values, _) => 
values.parse_default_literal(default_json)?,
         };
         Ok(lit)
     }
@@ -685,6 +690,8 @@ pub(crate) enum Codec {
     /// Represents Avro custom logical type to map to Arrow 
Duration(TimeUnit::Second)
     #[cfg(feature = "avro_custom_types")]
     DurationSeconds,
+    #[cfg(feature = "avro_custom_types")]
+    RunEndEncoded(Arc<AvroDataType>, u8),
 }
 
 impl Codec {
@@ -765,6 +772,19 @@ impl Codec {
             Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
             #[cfg(feature = "avro_custom_types")]
             Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(values, bits) => {
+                let run_ends_dt = match *bits {
+                    16 => DataType::Int16,
+                    32 => DataType::Int32,
+                    64 => DataType::Int64,
+                    _ => unreachable!(),
+                };
+                DataType::RunEndEncoded(
+                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
+                    Arc::new(Field::new("values", values.codec().data_type(), 
true)),
+                )
+            }
         }
     }
 
@@ -936,6 +956,8 @@ impl From<&Codec> for UnionFieldKind {
             Codec::Uuid => Self::Uuid,
             Codec::Union(..) => Self::Union,
             #[cfg(feature = "avro_custom_types")]
+            Codec::RunEndEncoded(values, _) => 
UnionFieldKind::from(values.codec()),
+            #[cfg(feature = "avro_custom_types")]
             Codec::DurationNanos
             | Codec::DurationMicros
             | Codec::DurationMillis
@@ -1141,6 +1163,16 @@ impl<'a> Maker<'a> {
         }
     }
 
+    #[cfg(feature = "avro_custom_types")]
+    #[inline]
+    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
+        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
+            let mut inner = (*values).clone();
+            inner.nullability = Some(nb);
+            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
+        }
+    }
+
     fn make_data_type<'s>(
         &mut self,
         writer_schema: &'s Schema<'a>,
@@ -1185,6 +1217,8 @@ impl<'a> Maker<'a> {
                     (true, Some(0)) => {
                         let mut field = self.parse_type(&f[1], namespace)?;
                         field.nullability = Some(Nullability::NullFirst);
+                        #[cfg(feature = "avro_custom_types")]
+                        Self::propagate_nullability_into_ree(&mut field, 
Nullability::NullFirst);
                         return Ok(field);
                     }
                     (true, Some(1)) => {
@@ -1196,6 +1230,8 @@ impl<'a> Maker<'a> {
                         }
                         let mut field = self.parse_type(&f[0], namespace)?;
                         field.nullability = Some(Nullability::NullSecond);
+                        #[cfg(feature = "avro_custom_types")]
+                        Self::propagate_nullability_into_ree(&mut field, 
Nullability::NullSecond);
                         return Ok(field);
                     }
                     _ => {}
@@ -1374,6 +1410,27 @@ impl<'a> Maker<'a> {
                     (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
                         *c = Codec::DurationSeconds
                     }
+                    #[cfg(feature = "avro_custom_types")]
+                    (Some("arrow.run-end-encoded"), _) => {
+                        let bits_u8: u8 = t
+                            .attributes
+                            .additional
+                            .get("arrow.runEndIndexBits")
+                            .and_then(|v| v.as_u64())
+                            .and_then(|n| u8::try_from(n).ok())
+                            .ok_or_else(|| ArrowError::ParseError(
+                                "arrow.run-end-encoded requires 
'arrow.runEndIndexBits' (one of 16, 32, or 64)"
+                                    .to_string(),
+                            ))?;
+                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
+                            return Err(ArrowError::ParseError(format!(
+                                "Invalid 'arrow.runEndIndexBits' value 
{bits_u8}; must be 16, 32, or 64"
+                            )));
+                        }
+                        // Wrap the parsed underlying site as REE
+                        let values_site = field.clone();
+                        field.codec = 
Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
+                    }
                     (Some(logical), _) => {
                         // Insert unrecognized logical type into metadata map
                         field.metadata.insert("logicalType".into(), 
logical.into());
@@ -1412,6 +1469,8 @@ impl<'a> Maker<'a> {
                     (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
                         let mut dt = self.make_data_type(w_nonnull, 
Some(r_nonnull), namespace)?;
                         dt.nullability = Some(w_nb);
+                        #[cfg(feature = "avro_custom_types")]
+                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
                         Ok(dt)
                     }
                     _ => self.resolve_unions(writer_variants, reader_variants, 
namespace),
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 99e782b0fd..7eac382d9f 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Avro Decoder for Arrow types.
+
 use crate::codec::{
     AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, 
ResolvedRecord,
     ResolvedUnion,
@@ -33,6 +35,8 @@ use arrow_schema::{
 };
 #[cfg(feature = "small_decimals")]
 use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
+#[cfg(feature = "avro_custom_types")]
+use arrow_select::take::{TakeOptions, take};
 use std::cmp::Ordering;
 use std::sync::Arc;
 use strum_macros::AsRefStr;
@@ -234,6 +238,8 @@ enum Decoder {
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+    #[cfg(feature = "avro_custom_types")]
+    RunEndEncoded(u8, usize, Box<Decoder>),
     Union(UnionDecoder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
 }
@@ -474,6 +480,23 @@ impl Decoder {
                     "Sparse Arrow unions are not yet supported".to_string(),
                 ));
             }
+            #[cfg(feature = "avro_custom_types")]
+            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
+                let inner = Self::try_new(values_dt)?;
+                let byte_width: u8 = match *width_bits_or_bytes {
+                    2 | 4 | 8 => *width_bits_or_bytes,
+                    16 => 2,
+                    32 => 4,
+                    64 => 8,
+                    other => {
+                        return Err(ArrowError::InvalidArgumentError(format!(
+                            "Unsupported run-end width {other} for 
RunEndEncoded; \
+                             expected 16/32/64 bits or 2/4/8 bytes"
+                        )));
+                    }
+                };
+                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
+            }
         };
         Ok(match data_type.nullability() {
             Some(nullability) => {
@@ -550,6 +573,11 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(_, len, inner) => {
+                *len += 1;
+                inner.append_null()?;
+            }
             Self::Union(u) => u.append_null()?,
             Self::Nullable(_, null_buffer, inner, _) => {
                 null_buffer.append(false);
@@ -778,6 +806,11 @@ impl Decoder {
                     "Default for enum must be a symbol".to_string(),
                 )),
             },
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(_, len, inner) => {
+                *len += 1;
+                inner.append_default(lit)
+            }
             Self::Union(u) => u.append_default(lit),
             Self::Record(field_meta, decoders, projector) => match lit {
                 AvroLiteral::Map(entries) => {
@@ -918,6 +951,11 @@ impl Decoder {
                 let nanos = (millis as i64) * 1_000_000;
                 builder.append_value(IntervalMonthDayNano::new(months as i32, 
days as i32, nanos));
             }
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(_, len, inner) => {
+                *len += 1;
+                inner.decode(buf)?;
+            }
             Self::Union(u) => u.decode(buf)?,
             Self::Nullable(order, nb, encoding, plan) => {
                 match *plan {
@@ -950,6 +988,12 @@ impl Decoder {
         buf: &mut AvroCursor<'_>,
         promotion: Promotion,
     ) -> Result<(), ArrowError> {
+        #[cfg(feature = "avro_custom_types")]
+        if let Self::RunEndEncoded(_, len, inner) = self {
+            *len += 1;
+            return inner.decode_with_promotion(buf, promotion);
+        }
+
         macro_rules! promote_numeric_to {
             ($variant:ident, $getter:ident, $to:ty) => {{
                 match self {
@@ -1158,6 +1202,71 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(width, len, inner) => {
+                let values = inner.flush(nulls)?;
+                let n = *len;
+                let arr = values.as_ref();
+                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
+                if n > 0 {
+                    run_starts.push(0);
+                    for i in 1..n {
+                        if !values_equal_at(arr, i - 1, i) {
+                            run_starts.push(i);
+                        }
+                    }
+                }
+                if n > (u32::MAX as usize) {
+                    return Err(ArrowError::InvalidArgumentError(format!(
+                        "RunEndEncoded length {n} exceeds maximum supported by 
UInt32 indices for take",
+                    )));
+                }
+                let run_count = run_starts.len();
+                let take_idx: PrimitiveArray<UInt32Type> =
+                    run_starts.iter().map(|&s| s as u32).collect();
+                let per_run_values = if run_count == 0 {
+                    values.slice(0, 0)
+                } else {
+                    take(arr, &take_idx, 
Option::from(TakeOptions::default())).map_err(|e| {
+                        ArrowError::ParseError(format!("take() for REE values 
failed: {e}"))
+                    })?
+                };
+
+                macro_rules! build_run_array {
+                    ($Native:ty, $ArrowTy:ty) => {{
+                        let mut ends: Vec<$Native> = 
Vec::with_capacity(run_count);
+                        for (idx, &_start) in run_starts.iter().enumerate() {
+                            let end = if idx + 1 < run_count {
+                                run_starts[idx + 1]
+                            } else {
+                                n
+                            };
+                            ends.push(end as $Native);
+                        }
+                        let ends: PrimitiveArray<$ArrowTy> = 
ends.into_iter().collect();
+                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, 
per_run_values.as_ref())
+                            .map_err(|e| 
ArrowError::ParseError(e.to_string()))?;
+                        Arc::new(run_arr) as ArrayRef
+                    }};
+                }
+                match *width {
+                    2 => {
+                        if n > i16::MAX as usize {
+                            return 
Err(ArrowError::InvalidArgumentError(format!(
+                                "RunEndEncoded length {n} exceeds i16::MAX for 
run end width 2"
+                            )));
+                        }
+                        build_run_array!(i16, Int16Type)
+                    }
+                    4 => build_run_array!(i32, Int32Type),
+                    8 => build_run_array!(i64, Int64Type),
+                    other => {
+                        return Err(ArrowError::InvalidArgumentError(format!(
+                            "Unsupported run-end width {other} for 
RunEndEncoded"
+                        )));
+                    }
+                }
+            }
             Self::Union(u) => u.flush(nulls)?,
         })
     }
@@ -1705,6 +1814,20 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) -> 
Result<[u8; N], ArrowError> {
     Ok(out)
 }
 
+#[cfg(feature = "avro_custom_types")]
+#[inline]
+fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
+    match (arr.is_null(i), arr.is_null(j)) {
+        (true, true) => true,
+        (true, false) | (false, true) => false,
+        (false, false) => {
+            let a = arr.slice(i, 1);
+            let b = arr.slice(j, 1);
+            a == b
+        }
+    }
+}
+
 #[derive(Debug)]
 struct Projector {
     writer_to_reader: Arc<[Option<usize>]>,
@@ -1846,6 +1969,8 @@ enum Skipper {
     Struct(Vec<Skipper>),
     Union(Vec<Skipper>),
     Nullable(Nullability, Box<Skipper>),
+    #[cfg(feature = "avro_custom_types")]
+    RunEndEncoded(Box<Skipper>),
 }
 
 impl Skipper {
@@ -1897,6 +2022,10 @@ impl Skipper {
                         .collect::<Result<_, _>>()?,
                 )
             }
+            #[cfg(feature = "avro_custom_types")]
+            Codec::RunEndEncoded(inner, _w) => {
+                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
+            }
         };
         if let Some(n) = dt.nullability() {
             base = Self::Nullable(n, Box::new(base));
@@ -2001,6 +2130,8 @@ impl Skipper {
                 }
                 Ok(())
             }
+            #[cfg(feature = "avro_custom_types")]
+            Self::RunEndEncoded(inner) => inner.skip(buf),
         }
     }
 }
@@ -4297,4 +4428,223 @@ mod tests {
         assert_eq!(cursor.position(), 12, "expected to consume 12 fixed 
bytes");
         Ok(())
     }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_width16_int32_basic_grouping() {
+        use arrow_array::RunArray;
+        use std::sync::Arc;
+        let inner = avro_from_codec(Codec::Int32);
+        let ree = AvroDataType::new(
+            Codec::RunEndEncoded(Arc::new(inner), 16),
+            Default::default(),
+            None,
+        );
+        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
+        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
+            let bytes = encode_avro_int(v);
+            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+        }
+        let arr = dec.flush(None).expect("flush");
+        let ra = arr
+            .as_any()
+            .downcast_ref::<RunArray<Int16Type>>()
+            .expect("RunArray<Int16Type>");
+        assert_eq!(ra.len(), 9);
+        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
+        let vals = ra
+            .values()
+            .as_ref()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("values Int32");
+        assert_eq!(vals.values(), &[1, 2, 3]);
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
+        use arrow_array::RunArray;
+        use std::sync::Arc;
+        let inner = AvroDataType::new(
+            Codec::Int32,
+            Default::default(),
+            Some(Nullability::NullSecond),
+        );
+        let ree = AvroDataType::new(
+            Codec::RunEndEncoded(Arc::new(inner), 32),
+            Default::default(),
+            None,
+        );
+        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
+        let seq: [Option<i32>; 8] = [
+            None,
+            None,
+            Some(7),
+            Some(7),
+            Some(7),
+            None,
+            Some(5),
+            Some(5),
+        ];
+        for item in seq {
+            let mut bytes = Vec::new();
+            match item {
+                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
+                Some(v) => {
+                    bytes.extend_from_slice(&encode_vlq_u64(0));
+                    bytes.extend_from_slice(&encode_avro_int(v));
+                }
+            }
+            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+        }
+        let arr = dec.flush(None).expect("flush");
+        let ra = arr
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .expect("RunArray<Int32Type>");
+        assert_eq!(ra.len(), 8);
+        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
+        let vals = ra
+            .values()
+            .as_ref()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("values Int32 (nullable)");
+        assert_eq!(vals.len(), 4);
+        assert!(vals.is_null(0));
+        assert_eq!(vals.value(1), 7);
+        assert!(vals.is_null(2));
+        assert_eq!(vals.value(3), 5);
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn 
test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single()
 {
+        use arrow_array::RunArray;
+        let inner_values = 
Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
+        let ree = Decoder::RunEndEncoded(
+            8, /* bytes => Int64 run-ends */
+            0,
+            Box::new(inner_values),
+        );
+        let mut dec = Decoder::Nullable(
+            Nullability::NullSecond,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(ree),
+            NullablePlan::FromSingle {
+                promotion: Promotion::IntToDouble,
+            },
+        );
+        for v in [1, 1, 2, 2, 2] {
+            let bytes = encode_avro_int(v);
+            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+        }
+        let arr = dec.flush(None).expect("flush");
+        let ra = arr
+            .as_any()
+            .downcast_ref::<RunArray<Int64Type>>()
+            .expect("RunArray<Int64Type>");
+        assert_eq!(ra.len(), 5);
+        assert_eq!(ra.run_ends().values(), &[2, 5]);
+        let vals = ra
+            .values()
+            .as_ref()
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("values Float64");
+        assert_eq!(vals.values(), &[1.0, 2.0]);
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_unsupported_run_end_width_errors() {
+        use std::sync::Arc;
+        let inner = avro_from_codec(Codec::Int32);
+        let dt = AvroDataType::new(
+            Codec::RunEndEncoded(Arc::new(inner), 3),
+            Default::default(),
+            None,
+        );
+        let err = Decoder::try_new(&dt).expect_err("must reject unsupported 
width");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Unsupported run-end width")
+                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
+            "unexpected error message: {msg}"
+        );
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_empty_input_is_empty_runarray() {
+        use arrow_array::RunArray;
+        use std::sync::Arc;
+        let inner = avro_from_codec(Codec::Utf8);
+        let dt = AvroDataType::new(
+            Codec::RunEndEncoded(Arc::new(inner), 4),
+            Default::default(),
+            None,
+        );
+        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
+        let arr = dec.flush(None).expect("flush");
+        let ra = arr
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .expect("RunArray<Int32Type>");
+        assert_eq!(ra.len(), 0);
+        assert_eq!(ra.run_ends().len(), 0);
+        assert_eq!(ra.values().len(), 0);
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_strings_grouping_width32_bits() {
+        use arrow_array::RunArray;
+        use std::sync::Arc;
+        let inner = avro_from_codec(Codec::Utf8);
+        let dt = AvroDataType::new(
+            Codec::RunEndEncoded(Arc::new(inner), 32),
+            Default::default(),
+            None,
+        );
+        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
+        for s in ["a", "a", "bb", "bb", "bb", "a"] {
+            let bytes = encode_avro_bytes(s.as_bytes());
+            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+        }
+        let arr = dec.flush(None).expect("flush");
+        let ra = arr
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .expect("RunArray<Int32Type>");
+        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
+        let vals = ra
+            .values()
+            .as_ref()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("values String");
+        assert_eq!(vals.len(), 3);
+        assert_eq!(vals.value(0), "a");
+        assert_eq!(vals.value(1), "bb");
+        assert_eq!(vals.value(2), "a");
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
+        let dt = avro_from_codec(Codec::Int32);
+        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
+        for v in [1, 2, 3] {
+            let bytes = encode_avro_int(v);
+            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+        }
+        let arr = dec.flush(None).expect("flush");
+        let a = arr
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        assert_eq!(a.values(), &[1, 2, 3]);
+    }
 }
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index cce9a7d1f8..cff8ee1937 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Avro Schema representations for Arrow.
+
 #[cfg(feature = "canonical_extension_types")]
 use arrow_schema::extension::ExtensionType;
 use arrow_schema::{
@@ -1525,14 +1527,46 @@ fn datatype_to_avro(
                 )?
             }
         }
-        DataType::RunEndEncoded(_, values) => process_datatype(
-            values.data_type(),
-            values.name(),
-            values.metadata(),
-            name_gen,
-            null_order,
-            false,
-        )?,
+        #[cfg(feature = "avro_custom_types")]
+        DataType::RunEndEncoded(run_ends, values) => {
+            let bits = match run_ends.data_type() {
+                DataType::Int16 => 16,
+                DataType::Int32 => 32,
+                DataType::Int64 => 64,
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "RunEndEncoded requires Int16/Int32/Int64 for 
run_ends, found: {other:?}"
+                    )));
+                }
+            };
+            // Build the value site schema, preserving its own nullability
+            let (value_schema, value_extras) = datatype_to_avro(
+                values.data_type(),
+                values.name(),
+                values.metadata(),
+                name_gen,
+                null_order,
+            )?;
+            let mut merged = merge_extras(value_schema, value_extras);
+            if values.is_nullable() {
+                merged = wrap_nullable(merged, null_order);
+            }
+            let mut extras = JsonMap::new();
+            extras.insert("logicalType".into(), 
json!("arrow.run-end-encoded"));
+            extras.insert("arrow.runEndIndexBits".into(), json!(bits));
+            return Ok((merged, extras));
+        }
+        #[cfg(not(feature = "avro_custom_types"))]
+        DataType::RunEndEncoded(_run_ends, values) => {
+            let (value_schema, _extras) = datatype_to_avro(
+                values.data_type(),
+                values.name(),
+                values.metadata(),
+                name_gen,
+                null_order,
+            )?;
+            return Ok((value_schema, JsonMap::new()));
+        }
         DataType::Union(fields, mode) => {
             let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
             let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 0fa217e894..5ddb798846 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -20,20 +20,22 @@
 use crate::codec::{AvroDataType, AvroField, Codec};
 use crate::schema::{Fingerprint, Nullability, Prefix};
 use arrow_array::cast::AsArray;
+use arrow_array::types::RunEndIndexType;
 use arrow_array::types::{
     ArrowPrimitiveType, Date32Type, DurationMicrosecondType, 
DurationMillisecondType,
-    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, 
Int32Type, Int64Type,
-    IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, 
Time32MillisecondType,
-    Time64MicrosecondType, TimestampMicrosecondType, TimestampMillisecondType,
+    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, 
Int16Type, Int32Type,
+    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, 
IntervalYearMonthType,
+    Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
+    TimestampMillisecondType,
 };
 use arrow_array::{
     Array, Decimal128Array, Decimal256Array, DictionaryArray, 
FixedSizeBinaryArray,
     GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, 
ListArray, MapArray,
-    OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, 
UnionArray,
+    OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, 
StructArray, UnionArray,
 };
 #[cfg(feature = "small_decimals")]
 use arrow_array::{Decimal32Array, Decimal64Array};
-use arrow_buffer::NullBuffer;
+use arrow_buffer::{ArrowNativeType, NullBuffer};
 use arrow_schema::{
     ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, 
TimeUnit, UnionMode,
 };
@@ -423,7 +425,6 @@ impl<'a> FieldEncoder<'a> {
                         .ok_or_else(|| {
                             ArrowError::SchemaError("Expected 
DictionaryArray<Int32>".into())
                         })?;
-
                     let values = dict
                         .values()
                         .as_any()
@@ -464,6 +465,67 @@ impl<'a> FieldEncoder<'a> {
 
                 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
             }
+            FieldPlan::RunEndEncoded {
+                values_nullability,
+                value_plan,
+            } => {
+                let dt = array.data_type();
+                let values_field = match dt {
+                    DataType::RunEndEncoded(_re_field, v_field) => 
v_field.as_ref(),
+                    other => {
+                        return Err(ArrowError::SchemaError(format!(
+                            "Avro RunEndEncoded site requires Arrow 
DataType::RunEndEncoded, found: {other:?}"
+                        )));
+                    }
+                };
+                // Helper closure to build a typed RunEncodedEncoder<R>
+                let build = |run_arr_any: &'a dyn Array| -> 
Result<Encoder<'a>, ArrowError> {
+                    if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
+                        let values_enc = prepare_value_site_encoder(
+                            arr.values().as_ref(),
+                            values_field,
+                            *values_nullability,
+                            value_plan.as_ref(),
+                        )?;
+                        return 
Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
+                            Int16Type,
+                        >::new(
+                            arr, values_enc
+                        ))));
+                    }
+                    if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
+                        let values_enc = prepare_value_site_encoder(
+                            arr.values().as_ref(),
+                            values_field,
+                            *values_nullability,
+                            value_plan.as_ref(),
+                        )?;
+                        return 
Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
+                            Int32Type,
+                        >::new(
+                            arr, values_enc
+                        ))));
+                    }
+                    if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
+                        let values_enc = prepare_value_site_encoder(
+                            arr.values().as_ref(),
+                            values_field,
+                            *values_nullability,
+                            value_plan.as_ref(),
+                        )?;
+                        return 
Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
+                            Int64Type,
+                        >::new(
+                            arr, values_enc
+                        ))));
+                    }
+                    Err(ArrowError::SchemaError(
+                        "Unsupported run-ends index type for RunEndEncoded; 
expected Int16/Int32/Int64"
+                            .into(),
+                    ))
+                };
+                build(array)?
+            }
         };
         // Compute the effective null state from writer-declared nullability 
and data nulls.
         let null_state = match (nullability, array.null_count() > 0) {
@@ -545,6 +607,12 @@ enum FieldPlan {
     Enum { symbols: Arc<[String]> },
     /// Avro union, maps to Arrow Union.
     Union { bindings: Vec<FieldBinding> },
+    /// Avro RunEndEncoded site. Values are encoded per logical row by mapping 
the
+    /// row index to its containing run and emitting that run's value with 
`value_plan`.
+    RunEndEncoded {
+        values_nullability: Option<Nullability>,
+        value_plan: Box<FieldPlan>,
+    },
 }
 
 #[derive(Debug, Clone)]
@@ -638,10 +706,17 @@ impl RecordEncoder {
                 ArrowError::SchemaError(format!("Column index {arrow_index} 
out of range"))
             })?;
             let field = fields[arrow_index].as_ref();
+            #[cfg(not(feature = "avro_custom_types"))]
+            let site_nullability = match &col_plan.plan {
+                FieldPlan::RunEndEncoded { .. } => None,
+                _ => col_plan.nullability,
+            };
+            #[cfg(feature = "avro_custom_types")]
+            let site_nullability = col_plan.nullability;
             let encoder = prepare_value_site_encoder(
                 array.as_ref(),
                 field,
-                col_plan.nullability,
+                site_nullability,
                 &col_plan.plan,
             )?;
             out.push(encoder);
@@ -694,6 +769,25 @@ fn find_map_value_field_index(fields: 
&arrow_schema::Fields) -> Option<usize> {
 
 impl FieldPlan {
     fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, 
ArrowError> {
+        #[cfg(not(feature = "avro_custom_types"))]
+        if let DataType::RunEndEncoded(_re_field, values_field) = 
arrow_field.data_type() {
+            let values_nullability = avro_dt.nullability();
+            let value_site_dt: &AvroDataType = match avro_dt.codec() {
+                Codec::Union(branches, _, _) => branches
+                    .iter()
+                    .find(|b| !matches!(b.codec(), Codec::Null))
+                    .ok_or_else(|| {
+                        ArrowError::SchemaError(
+                            "Avro union at RunEndEncoded site has no non-null 
branch".into(),
+                        )
+                    })?,
+                _ => avro_dt,
+            };
+            return Ok(FieldPlan::RunEndEncoded {
+                values_nullability,
+                value_plan: Box::new(FieldPlan::build(value_site_dt, 
values_field.as_ref())?),
+            });
+        }
         if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
             // Extension-based detection (only when the feature is enabled)
             let ext_is_uuid = {
@@ -860,7 +954,6 @@ impl FieldPlan {
                         )));
                     }
                 };
-
                 if avro_branches.len() != arrow_union_fields.len() {
                     return Err(ArrowError::SchemaError(format!(
                         "Mismatched number of branches between Avro union ({}) 
and Arrow union ({}) for field '{}'",
@@ -869,7 +962,6 @@ impl FieldPlan {
                         arrow_field.name()
                     )));
                 }
-
                 let bindings = avro_branches
                     .iter()
                     .zip(arrow_union_fields.iter())
@@ -882,12 +974,26 @@ impl FieldPlan {
                         })
                     })
                     .collect::<Result<Vec<_>, ArrowError>>()?;
-
                 Ok(FieldPlan::Union { bindings })
             }
             Codec::Union(_, _, UnionMode::Sparse) => 
Err(ArrowError::NotYetImplemented(
                 "Sparse Arrow unions are not yet supported".to_string(),
             )),
+            #[cfg(feature = "avro_custom_types")]
+            Codec::RunEndEncoded(values_dt, _width_code) => {
+                let values_field = match arrow_field.data_type() {
+                    DataType::RunEndEncoded(_run_ends_field, values_field) => 
values_field.as_ref(),
+                    other => {
+                        return Err(ArrowError::SchemaError(format!(
+                            "Avro RunEndEncoded maps to Arrow 
DataType::RunEndEncoded, found: {other:?}"
+                        )));
+                    }
+                };
+                Ok(FieldPlan::RunEndEncoded {
+                    values_nullability: values_dt.nullability(),
+                    value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), 
values_field)?),
+                })
+            }
             _ => Ok(FieldPlan::Scalar),
         }
     }
@@ -935,6 +1041,10 @@ enum Encoder<'a> {
     Enum(EnumEncoder<'a>),
     Map(Box<MapEncoder<'a>>),
     Union(Box<UnionEncoder<'a>>),
+    /// Run-end encoded values with specific run-end index widths
+    RunEncoded16(Box<RunEncodedEncoder16<'a>>),
+    RunEncoded32(Box<RunEncodedEncoder32<'a>>),
+    RunEncoded64(Box<RunEncodedEncoder64<'a>>),
     Null,
 }
 
@@ -977,6 +1087,9 @@ impl<'a> Encoder<'a> {
             Encoder::Map(e) => (e).encode(out, idx),
             Encoder::Enum(e) => (e).encode(out, idx),
             Encoder::Union(e) => (e).encode(out, idx),
+            Encoder::RunEncoded16(e) => (e).encode(out, idx),
+            Encoder::RunEncoded32(e) => (e).encode(out, idx),
+            Encoder::RunEncoded64(e) => (e).encode(out, idx),
             Encoder::Null => Ok(()),
         }
     }
@@ -1460,6 +1573,7 @@ impl IntervalToDurationParts for IntervalDayTimeType {
         })
     }
 }
+
 /// Single generic encoder used for all three interval units.
 /// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
 struct DurationEncoder<'a, P: ArrowPrimitiveType + 
IntervalToDurationParts>(&'a PrimitiveArray<P>);
@@ -1554,6 +1668,72 @@ type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, 
Decimal64Array>;
 type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
 type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
 
+/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
+/// Follows the pattern used by other generic encoders (i.e., 
`ListEncoder<O>`),
+/// avoiding runtime branching on run-end width.
+struct RunEncodedEncoder<'a, R: RunEndIndexType> {
+    ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
+    base: usize,
+    len: usize,
+    values: FieldEncoder<'a>,
+    // Cached run index used for sequential scans of rows [0..n)
+    cur_run: usize,
+    // Cached end (logical index, 1-based per spec) for the current run.
+    cur_end: usize,
+}
+
+type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
+type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
+type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
+
+impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
+    fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
+        let ends = arr.run_ends();
+        let base = ends.get_start_physical_index();
+        let slice = ends.values();
+        let len = ends.len();
+        let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
+        Self {
+            ends_slice: slice,
+            base,
+            len,
+            values,
+            cur_run: 0,
+            cur_end,
+        }
+    }
+
+    /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
+    /// Uses the REE invariant: run ends are strictly increasing, positive, 
and 1-based.
+    #[inline(always)]
+    fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
+        if idx < self.cur_end {
+            return Ok(());
+        }
+        // Move forward across run boundaries until idx falls within cur_end
+        while self.cur_run + 1 < self.len && idx >= self.cur_end {
+            self.cur_run += 1;
+            self.cur_end = self.ends_slice[self.base + 
self.cur_run].as_usize();
+        }
+        if idx < self.cur_end {
+            Ok(())
+        } else {
+            Err(ArrowError::InvalidArgumentError(format!(
+                "row index {idx} out of bounds for run-ends ({} runs)",
+                self.len
+            )))
+        }
+    }
+
+    #[inline(always)]
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        self.advance_to_row(idx)?;
+        // For REE values, the value for any logical row within a run is at
+        // the physical index of that run.
+        self.values.encode(out, self.cur_run)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 6d5ace2230..c1ec61c7fa 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -395,9 +395,13 @@ mod tests {
     use crate::reader::ReaderBuilder;
     use crate::schema::{AvroSchema, SchemaStore};
     use crate::test_util::arrow_test_data;
+    #[cfg(feature = "avro_custom_types")]
+    use arrow_array::types::{Int16Type, Int32Type, Int64Type};
     use arrow_array::{
         Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StructArray, 
UnionArray,
     };
+    #[cfg(feature = "avro_custom_types")]
+    use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
     #[cfg(not(feature = "avro_custom_types"))]
     use arrow_schema::{DataType, Field, Schema};
     #[cfg(feature = "avro_custom_types")]
@@ -1452,4 +1456,407 @@ mod tests {
         assert_eq!(round_trip, input);
         Ok(())
     }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
+        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
+        let run_values = Int32Array::from(vec![Some(1), Some(2), None, 
Some(3)]);
+        let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
+        let field = Field::new("x", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(ree.clone()) as ArrayRef],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 8);
+        match out.schema().field(0).data_type() {
+            DataType::RunEndEncoded(run_ends_field, values_field) => {
+                assert_eq!(run_ends_field.name(), "run_ends");
+                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
+                assert_eq!(values_field.name(), "values");
+                assert_eq!(values_field.data_type(), &DataType::Int32);
+                assert!(values_field.is_nullable());
+                let got_ree = out
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<RunArray<Int32Type>>()
+                    .expect("RunArray<Int32Type>");
+                assert_eq!(got_ree, &ree);
+            }
+            other => panic!(
+                "Unexpected DataType for round-tripped RunEndEncoded column: 
{:?}",
+                other
+            ),
+        }
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> 
Result<(), ArrowError>
+    {
+        let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
+        let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
+        let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
+        let field = Field::new("s", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(ree.clone()) as ArrayRef],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 7);
+        match out.schema().field(0).data_type() {
+            DataType::RunEndEncoded(run_ends_field, values_field) => {
+                assert_eq!(run_ends_field.data_type(), &DataType::Int16);
+                assert_eq!(values_field.data_type(), &DataType::Utf8);
+                assert!(
+                    values_field.is_nullable(),
+                    "REE 'values' child should be nullable"
+                );
+                let got = out
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<RunArray<Int16Type>>()
+                    .expect("RunArray<Int16Type>");
+                assert_eq!(got, &ree);
+            }
+            other => panic!("Unexpected DataType: {:?}", other),
+        }
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
+    -> Result<(), ArrowError> {
+        let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
+        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
+        let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
+        let field = Field::new("y", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(ree.clone()) as ArrayRef],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 8);
+        match out.schema().field(0).data_type() {
+            DataType::RunEndEncoded(run_ends_field, values_field) => {
+                assert_eq!(run_ends_field.data_type(), &DataType::Int64);
+                assert_eq!(values_field.data_type(), &DataType::Int32);
+                assert!(values_field.is_nullable());
+                let got = out
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<RunArray<Int64Type>>()
+                    .expect("RunArray<Int64Type>");
+                assert_eq!(got, &ree);
+            }
+            other => panic!("Unexpected DataType for REE column: {:?}", other),
+        }
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), 
ArrowError> {
+        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
+        let run_values = Int32Array::from(vec![Some(1), Some(2), None, 
Some(3)]);
+        let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
+        let offset = 1usize;
+        let length = 6usize;
+        let base_values = base
+            .values()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("REE values as Int32Array");
+        let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
+        for i in offset..offset + length {
+            let phys = base.get_physical_index(i);
+            let v = if base_values.is_null(phys) {
+                None
+            } else {
+                Some(base_values.value(phys))
+            };
+            logical_window.push(v);
+        }
+
+        fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, 
Int32Array) {
+            if vals.is_empty() {
+                return (Int32Array::new_null(0), Int32Array::new_null(0));
+            }
+            let mut run_ends_out: Vec<i32> = Vec::new();
+            let mut run_vals_out: Vec<Option<i32>> = Vec::new();
+            let mut cur = vals[0];
+            let mut len = 1i32;
+            for v in &vals[1..] {
+                if *v == cur {
+                    len += 1;
+                } else {
+                    let last_end = run_ends_out.last().copied().unwrap_or(0);
+                    run_ends_out.push(last_end + len);
+                    run_vals_out.push(cur);
+                    cur = *v;
+                    len = 1;
+                }
+            }
+            let last_end = run_ends_out.last().copied().unwrap_or(0);
+            run_ends_out.push(last_end + len);
+            run_vals_out.push(cur);
+            (
+                Int32Array::from(run_ends_out),
+                Int32Array::from(run_vals_out),
+            )
+        }
+        let (owned_run_ends, owned_run_values) = 
compress_run_ends_i32(&logical_window);
+        let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, 
&owned_run_values)?;
+        let field = Field::new("x", owned_slice.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(owned_slice.clone()) as ArrayRef],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), length);
+        match out.schema().field(0).data_type() {
+            DataType::RunEndEncoded(run_ends_field, values_field) => {
+                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
+                assert_eq!(values_field.data_type(), &DataType::Int32);
+                assert!(values_field.is_nullable());
+                let got = out
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<RunArray<Int32Type>>()
+                    .expect("RunArray<Int32Type>");
+                fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
+                    let vals = a
+                        .values()
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .expect("REE values as Int32Array");
+                    let mut out: Vec<Option<i32>> = 
Vec::with_capacity(a.len());
+                    for i in 0..a.len() {
+                        let phys = a.get_physical_index(i);
+                        out.push(if vals.is_null(phys) {
+                            None
+                        } else {
+                            Some(vals.value(phys))
+                        });
+                    }
+                    Int32Array::from(out)
+                }
+                let got_logical = expand_ree_to_int32(got);
+                let expected_logical = Int32Array::from(logical_window);
+                assert_eq!(
+                    got_logical, expected_logical,
+                    "Logical values differ after REE slice round-trip"
+                );
+            }
+            other => panic!("Unexpected DataType for REE column: {:?}", other),
+        }
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), 
ArrowError> {
+        use arrow_schema::{DataType, Field, Schema};
+        let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
+        let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), 
None, Some(3)]);
+        let ree = 
arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
+            &run_ends,
+            &run_values,
+        )?;
+        let field = Field::new("x", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) 
as ArrayRef])?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 8);
+        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+        let got = out
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected = Int32Array::from(vec![
+            Some(1),
+            Some(1),
+            Some(1),
+            Some(2),
+            Some(2),
+            None,
+            None,
+            Some(3),
+        ]);
+        assert_eq!(got, &expected);
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn 
test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
+    -> Result<(), ArrowError> {
+        use arrow_schema::{DataType, Field, Schema};
+        let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
+        let run_values = arrow_array::StringArray::from(vec![Some("a"), None, 
Some("c")]);
+        let ree = 
arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
+            &run_ends,
+            &run_values,
+        )?;
+        let field = Field::new("s", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) 
as ArrayRef])?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 7);
+        assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
+        let got = out
+            .column(0)
+            .as_any()
+            .downcast_ref::<arrow_array::StringArray>()
+            .expect("StringArray");
+        let expected = arrow_array::StringArray::from(vec![
+            Some("a"),
+            Some("a"),
+            None,
+            None,
+            None,
+            Some("c"),
+            Some("c"),
+        ]);
+        assert_eq!(got, &expected);
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn 
test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
+    -> Result<(), ArrowError> {
+        use arrow_schema::{DataType, Field, Schema};
+        let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
+        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
+        let ree = 
arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
+            &run_ends,
+            &run_values,
+        )?;
+        let field = Field::new("y", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) 
as ArrayRef])?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 8);
+        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+        let got = out
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected = Int32Array::from(vec![
+            Some(999),
+            Some(999),
+            Some(999),
+            Some(999),
+            Some(-5),
+            Some(-5),
+            Some(-5),
+            Some(-5),
+        ]);
+        assert_eq!(got, &expected);
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> 
Result<(), ArrowError> {
+        use arrow_schema::{DataType, Field, Schema};
+        let run_ends = Int32Array::from(vec![2, 4, 6]);
+        let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
+        let ree = 
arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
+            &run_ends,
+            &run_values,
+        )?;
+        let field = Field::new("x", ree.data_type().clone(), true);
+        let schema = Schema::new(vec![field]);
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) 
as ArrayRef])?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+        let out_schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let out = arrow::compute::concat_batches(&out_schema, 
&batches).expect("concat output");
+        assert_eq!(out.num_columns(), 1);
+        assert_eq!(out.num_rows(), 6);
+        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+        let got = out
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), 
Some(2), None, None]);
+        assert_eq!(got, &expected);
+        Ok(())
+    }
 }

Reply via email to