This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 74c0386494 Add support for run-end encoded (REE) arrays in arrow-avro
(#8584)
74c0386494 is described below
commit 74c03864948c5699b2a0e47010d6282bc636f373
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Oct 13 04:50:06 2025 -0500
Add support for run-end encoded (REE) arrays in arrow-avro (#8584)
# Which issue does this PR close?
- Part of #4886
# Rationale for this change
Arrow has a first‑class **Run‑End Encoded (REE)** data type that
efficiently represents consecutive repeated values by storing *run ends*
(indices) alongside the *values* array. Adding REE support to
`arrow-avro` lets users read/write Arrow REE arrays to Avro without
inflating them, preserving size and performance characteristics across
serialization boundaries.
# What changes are included in this PR?
- **New Avro codec for REE**: Introduces
`Codec::RunEndEncoded(Arc<AvroDataType>, u8)` and maps it to Arrow’s
`DataType::RunEndEncoded`. The run‑end index bit width must be one of
16/32/64, and the generated Arrow fields use the standard child names
`run_ends` (non‑nullable) and `values`.
- **Schema parsing & validation**: Recognizes the Avro logical type
`arrow.run-end-encoded` and *requires* the attribute
`arrow.runEndIndexBits` (one of 16, 32, 64). Missing or invalid values
yield clear parse errors.
- **Nullability propagation**: When REE appears inside nullable unions,
nullability is “bubbled” into the `values` branch so Avro JSON
generation models nullability correctly.
- **Union integration**: `From<&Codec> for UnionFieldKind` is updated so
REE defers to the inner value codec, ensuring unions of REE types
resolve as expected.
- **Feature wiring / dependency**: Enables REE handling behind the
existing `avro_custom_types` feature and adds an optional dependency on
`arrow-select` (feature now includes `"arrow-select"`).
- **Reader/Writer updates**: Enhances the encoder/reader paths to
round‑trip REE arrays end‑to‑end.
# Are these changes tested?
Yes. This commit adds **end‑to‑end tests** that round‑trip REE arrays
with run‑end index types **Int16**, **Int32**, and **Int64** through the
Avro reader/writer to validate schema, encoding, and decoding.
# Are there any user-facing changes?
N/A since `arrow-avro` is not public yet.
---------
Co-authored-by: Matthijs Brobbel <[email protected]>
---
arrow-avro/Cargo.toml | 3 +-
arrow-avro/src/codec.rs | 59 ++++++
arrow-avro/src/reader/record.rs | 350 +++++++++++++++++++++++++++++++++
arrow-avro/src/schema.rs | 50 ++++-
arrow-avro/src/writer/encoder.rs | 200 ++++++++++++++++++-
arrow-avro/src/writer/mod.rs | 407 +++++++++++++++++++++++++++++++++++++++
6 files changed, 1050 insertions(+), 19 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 374cc896d5..dc59d337a0 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -43,12 +43,13 @@ canonical_extension_types =
["arrow-schema/canonical_extension_types"]
md5 = ["dep:md5"]
sha256 = ["dep:sha2"]
small_decimals = []
-avro_custom_types = []
+avro_custom_types = ["dep:arrow-select"]
[dependencies]
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
+arrow-select = { workspace = true, optional = true }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
flate2 = { version = "1.0", default-features = false, features = [
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index f946a5a1b9..a6a495bdf3 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -14,6 +14,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
+//! Codec for Mapping Avro and Arrow types.
+
use crate::schema::{
AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY,
AVRO_NAME_METADATA_KEY,
AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed,
Map, Nullability,
@@ -460,6 +463,8 @@ impl AvroDataType {
};
default_encoding.parse_default_literal(default_json)?
}
+ #[cfg(feature = "avro_custom_types")]
+ Codec::RunEndEncoded(values, _) =>
values.parse_default_literal(default_json)?,
};
Ok(lit)
}
@@ -685,6 +690,8 @@ pub(crate) enum Codec {
/// Represents Avro custom logical type to map to Arrow
Duration(TimeUnit::Second)
#[cfg(feature = "avro_custom_types")]
DurationSeconds,
+ #[cfg(feature = "avro_custom_types")]
+ RunEndEncoded(Arc<AvroDataType>, u8),
}
impl Codec {
@@ -765,6 +772,19 @@ impl Codec {
Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
#[cfg(feature = "avro_custom_types")]
Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(values, bits) => {
+ let run_ends_dt = match *bits {
+ 16 => DataType::Int16,
+ 32 => DataType::Int32,
+ 64 => DataType::Int64,
+ _ => unreachable!(),
+ };
+ DataType::RunEndEncoded(
+ Arc::new(Field::new("run_ends", run_ends_dt, false)),
+ Arc::new(Field::new("values", values.codec().data_type(),
true)),
+ )
+ }
}
}
@@ -936,6 +956,8 @@ impl From<&Codec> for UnionFieldKind {
Codec::Uuid => Self::Uuid,
Codec::Union(..) => Self::Union,
#[cfg(feature = "avro_custom_types")]
+ Codec::RunEndEncoded(values, _) =>
UnionFieldKind::from(values.codec()),
+ #[cfg(feature = "avro_custom_types")]
Codec::DurationNanos
| Codec::DurationMicros
| Codec::DurationMillis
@@ -1141,6 +1163,16 @@ impl<'a> Maker<'a> {
}
}
+ #[cfg(feature = "avro_custom_types")]
+ #[inline]
+ fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
+ if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
+ let mut inner = (*values).clone();
+ inner.nullability = Some(nb);
+ dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
+ }
+ }
+
fn make_data_type<'s>(
&mut self,
writer_schema: &'s Schema<'a>,
@@ -1185,6 +1217,8 @@ impl<'a> Maker<'a> {
(true, Some(0)) => {
let mut field = self.parse_type(&f[1], namespace)?;
field.nullability = Some(Nullability::NullFirst);
+ #[cfg(feature = "avro_custom_types")]
+ Self::propagate_nullability_into_ree(&mut field,
Nullability::NullFirst);
return Ok(field);
}
(true, Some(1)) => {
@@ -1196,6 +1230,8 @@ impl<'a> Maker<'a> {
}
let mut field = self.parse_type(&f[0], namespace)?;
field.nullability = Some(Nullability::NullSecond);
+ #[cfg(feature = "avro_custom_types")]
+ Self::propagate_nullability_into_ree(&mut field,
Nullability::NullSecond);
return Ok(field);
}
_ => {}
@@ -1374,6 +1410,27 @@ impl<'a> Maker<'a> {
(Some("arrow.duration-seconds"), c @ Codec::Int64) => {
*c = Codec::DurationSeconds
}
+ #[cfg(feature = "avro_custom_types")]
+ (Some("arrow.run-end-encoded"), _) => {
+ let bits_u8: u8 = t
+ .attributes
+ .additional
+ .get("arrow.runEndIndexBits")
+ .and_then(|v| v.as_u64())
+ .and_then(|n| u8::try_from(n).ok())
+ .ok_or_else(|| ArrowError::ParseError(
+ "arrow.run-end-encoded requires
'arrow.runEndIndexBits' (one of 16, 32, or 64)"
+ .to_string(),
+ ))?;
+ if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
+ return Err(ArrowError::ParseError(format!(
+ "Invalid 'arrow.runEndIndexBits' value
{bits_u8}; must be 16, 32, or 64"
+ )));
+ }
+ // Wrap the parsed underlying site as REE
+ let values_site = field.clone();
+ field.codec =
Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
+ }
(Some(logical), _) => {
// Insert unrecognized logical type into metadata map
field.metadata.insert("logicalType".into(),
logical.into());
@@ -1412,6 +1469,8 @@ impl<'a> Maker<'a> {
(Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
let mut dt = self.make_data_type(w_nonnull,
Some(r_nonnull), namespace)?;
dt.nullability = Some(w_nb);
+ #[cfg(feature = "avro_custom_types")]
+ Self::propagate_nullability_into_ree(&mut dt, w_nb);
Ok(dt)
}
_ => self.resolve_unions(writer_variants, reader_variants,
namespace),
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 99e782b0fd..7eac382d9f 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+//! Avro Decoder for Arrow types.
+
use crate::codec::{
AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo,
ResolvedRecord,
ResolvedUnion,
@@ -33,6 +35,8 @@ use arrow_schema::{
};
#[cfg(feature = "small_decimals")]
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
+#[cfg(feature = "avro_custom_types")]
+use arrow_select::take::{TakeOptions, take};
use std::cmp::Ordering;
use std::sync::Arc;
use strum_macros::AsRefStr;
@@ -234,6 +238,8 @@ enum Decoder {
Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+ #[cfg(feature = "avro_custom_types")]
+ RunEndEncoded(u8, usize, Box<Decoder>),
Union(UnionDecoder),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
}
@@ -474,6 +480,23 @@ impl Decoder {
"Sparse Arrow unions are not yet supported".to_string(),
));
}
+ #[cfg(feature = "avro_custom_types")]
+ (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
+ let inner = Self::try_new(values_dt)?;
+ let byte_width: u8 = match *width_bits_or_bytes {
+ 2 | 4 | 8 => *width_bits_or_bytes,
+ 16 => 2,
+ 32 => 4,
+ 64 => 8,
+ other => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Unsupported run-end width {other} for
RunEndEncoded; \
+ expected 16/32/64 bits or 2/4/8 bytes"
+ )));
+ }
+ };
+ Self::RunEndEncoded(byte_width, 0, Box::new(inner))
+ }
};
Ok(match data_type.nullability() {
Some(nullability) => {
@@ -550,6 +573,11 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(_, len, inner) => {
+ *len += 1;
+ inner.append_null()?;
+ }
Self::Union(u) => u.append_null()?,
Self::Nullable(_, null_buffer, inner, _) => {
null_buffer.append(false);
@@ -778,6 +806,11 @@ impl Decoder {
"Default for enum must be a symbol".to_string(),
)),
},
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(_, len, inner) => {
+ *len += 1;
+ inner.append_default(lit)
+ }
Self::Union(u) => u.append_default(lit),
Self::Record(field_meta, decoders, projector) => match lit {
AvroLiteral::Map(entries) => {
@@ -918,6 +951,11 @@ impl Decoder {
let nanos = (millis as i64) * 1_000_000;
builder.append_value(IntervalMonthDayNano::new(months as i32,
days as i32, nanos));
}
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(_, len, inner) => {
+ *len += 1;
+ inner.decode(buf)?;
+ }
Self::Union(u) => u.decode(buf)?,
Self::Nullable(order, nb, encoding, plan) => {
match *plan {
@@ -950,6 +988,12 @@ impl Decoder {
buf: &mut AvroCursor<'_>,
promotion: Promotion,
) -> Result<(), ArrowError> {
+ #[cfg(feature = "avro_custom_types")]
+ if let Self::RunEndEncoded(_, len, inner) = self {
+ *len += 1;
+ return inner.decode_with_promotion(buf, promotion);
+ }
+
macro_rules! promote_numeric_to {
($variant:ident, $getter:ident, $to:ty) => {{
match self {
@@ -1158,6 +1202,71 @@ impl Decoder {
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
Arc::new(vals)
}
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(width, len, inner) => {
+ let values = inner.flush(nulls)?;
+ let n = *len;
+ let arr = values.as_ref();
+ let mut run_starts: Vec<usize> = Vec::with_capacity(n);
+ if n > 0 {
+ run_starts.push(0);
+ for i in 1..n {
+ if !values_equal_at(arr, i - 1, i) {
+ run_starts.push(i);
+ }
+ }
+ }
+ if n > (u32::MAX as usize) {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "RunEndEncoded length {n} exceeds maximum supported by
UInt32 indices for take",
+ )));
+ }
+ let run_count = run_starts.len();
+ let take_idx: PrimitiveArray<UInt32Type> =
+ run_starts.iter().map(|&s| s as u32).collect();
+ let per_run_values = if run_count == 0 {
+ values.slice(0, 0)
+ } else {
+ take(arr, &take_idx,
Option::from(TakeOptions::default())).map_err(|e| {
+ ArrowError::ParseError(format!("take() for REE values
failed: {e}"))
+ })?
+ };
+
+ macro_rules! build_run_array {
+ ($Native:ty, $ArrowTy:ty) => {{
+ let mut ends: Vec<$Native> =
Vec::with_capacity(run_count);
+ for (idx, &_start) in run_starts.iter().enumerate() {
+ let end = if idx + 1 < run_count {
+ run_starts[idx + 1]
+ } else {
+ n
+ };
+ ends.push(end as $Native);
+ }
+ let ends: PrimitiveArray<$ArrowTy> =
ends.into_iter().collect();
+ let run_arr = RunArray::<$ArrowTy>::try_new(&ends,
per_run_values.as_ref())
+ .map_err(|e|
ArrowError::ParseError(e.to_string()))?;
+ Arc::new(run_arr) as ArrayRef
+ }};
+ }
+ match *width {
+ 2 => {
+ if n > i16::MAX as usize {
+ return
Err(ArrowError::InvalidArgumentError(format!(
+ "RunEndEncoded length {n} exceeds i16::MAX for
run end width 2"
+ )));
+ }
+ build_run_array!(i16, Int16Type)
+ }
+ 4 => build_run_array!(i32, Int32Type),
+ 8 => build_run_array!(i64, Int64Type),
+ other => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Unsupported run-end width {other} for
RunEndEncoded"
+ )));
+ }
+ }
+ }
Self::Union(u) => u.flush(nulls)?,
})
}
@@ -1705,6 +1814,20 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) ->
Result<[u8; N], ArrowError> {
Ok(out)
}
+#[cfg(feature = "avro_custom_types")]
+#[inline]
+fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
+ match (arr.is_null(i), arr.is_null(j)) {
+ (true, true) => true,
+ (true, false) | (false, true) => false,
+ (false, false) => {
+ let a = arr.slice(i, 1);
+ let b = arr.slice(j, 1);
+ a == b
+ }
+ }
+}
+
#[derive(Debug)]
struct Projector {
writer_to_reader: Arc<[Option<usize>]>,
@@ -1846,6 +1969,8 @@ enum Skipper {
Struct(Vec<Skipper>),
Union(Vec<Skipper>),
Nullable(Nullability, Box<Skipper>),
+ #[cfg(feature = "avro_custom_types")]
+ RunEndEncoded(Box<Skipper>),
}
impl Skipper {
@@ -1897,6 +2022,10 @@ impl Skipper {
.collect::<Result<_, _>>()?,
)
}
+ #[cfg(feature = "avro_custom_types")]
+ Codec::RunEndEncoded(inner, _w) => {
+ Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
+ }
};
if let Some(n) = dt.nullability() {
base = Self::Nullable(n, Box::new(base));
@@ -2001,6 +2130,8 @@ impl Skipper {
}
Ok(())
}
+ #[cfg(feature = "avro_custom_types")]
+ Self::RunEndEncoded(inner) => inner.skip(buf),
}
}
}
@@ -4297,4 +4428,223 @@ mod tests {
assert_eq!(cursor.position(), 12, "expected to consume 12 fixed
bytes");
Ok(())
}
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_width16_int32_basic_grouping() {
+ use arrow_array::RunArray;
+ use std::sync::Arc;
+ let inner = avro_from_codec(Codec::Int32);
+ let ree = AvroDataType::new(
+ Codec::RunEndEncoded(Arc::new(inner), 16),
+ Default::default(),
+ None,
+ );
+ let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
+ for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
+ let bytes = encode_avro_int(v);
+ dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+ }
+ let arr = dec.flush(None).expect("flush");
+ let ra = arr
+ .as_any()
+ .downcast_ref::<RunArray<Int16Type>>()
+ .expect("RunArray<Int16Type>");
+ assert_eq!(ra.len(), 9);
+ assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
+ let vals = ra
+ .values()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("values Int32");
+ assert_eq!(vals.values(), &[1, 2, 3]);
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_width32_nullable_values_group_nulls() {
+ use arrow_array::RunArray;
+ use std::sync::Arc;
+ let inner = AvroDataType::new(
+ Codec::Int32,
+ Default::default(),
+ Some(Nullability::NullSecond),
+ );
+ let ree = AvroDataType::new(
+ Codec::RunEndEncoded(Arc::new(inner), 32),
+ Default::default(),
+ None,
+ );
+ let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
+ let seq: [Option<i32>; 8] = [
+ None,
+ None,
+ Some(7),
+ Some(7),
+ Some(7),
+ None,
+ Some(5),
+ Some(5),
+ ];
+ for item in seq {
+ let mut bytes = Vec::new();
+ match item {
+ None => bytes.extend_from_slice(&encode_vlq_u64(1)),
+ Some(v) => {
+ bytes.extend_from_slice(&encode_vlq_u64(0));
+ bytes.extend_from_slice(&encode_avro_int(v));
+ }
+ }
+ dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+ }
+ let arr = dec.flush(None).expect("flush");
+ let ra = arr
+ .as_any()
+ .downcast_ref::<RunArray<Int32Type>>()
+ .expect("RunArray<Int32Type>");
+ assert_eq!(ra.len(), 8);
+ assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
+ let vals = ra
+ .values()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("values Int32 (nullable)");
+ assert_eq!(vals.len(), 4);
+ assert!(vals.is_null(0));
+ assert_eq!(vals.value(1), 7);
+ assert!(vals.is_null(2));
+ assert_eq!(vals.value(3), 5);
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn
test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single()
{
+ use arrow_array::RunArray;
+ let inner_values =
Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
+ let ree = Decoder::RunEndEncoded(
+ 8, /* bytes => Int64 run-ends */
+ 0,
+ Box::new(inner_values),
+ );
+ let mut dec = Decoder::Nullable(
+ Nullability::NullSecond,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(ree),
+ NullablePlan::FromSingle {
+ promotion: Promotion::IntToDouble,
+ },
+ );
+ for v in [1, 1, 2, 2, 2] {
+ let bytes = encode_avro_int(v);
+ dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+ }
+ let arr = dec.flush(None).expect("flush");
+ let ra = arr
+ .as_any()
+ .downcast_ref::<RunArray<Int64Type>>()
+ .expect("RunArray<Int64Type>");
+ assert_eq!(ra.len(), 5);
+ assert_eq!(ra.run_ends().values(), &[2, 5]);
+ let vals = ra
+ .values()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .expect("values Float64");
+ assert_eq!(vals.values(), &[1.0, 2.0]);
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_unsupported_run_end_width_errors() {
+ use std::sync::Arc;
+ let inner = avro_from_codec(Codec::Int32);
+ let dt = AvroDataType::new(
+ Codec::RunEndEncoded(Arc::new(inner), 3),
+ Default::default(),
+ None,
+ );
+ let err = Decoder::try_new(&dt).expect_err("must reject unsupported
width");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Unsupported run-end width")
+ && msg.contains("16/32/64 bits or 2/4/8 bytes"),
+ "unexpected error message: {msg}"
+ );
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_empty_input_is_empty_runarray() {
+ use arrow_array::RunArray;
+ use std::sync::Arc;
+ let inner = avro_from_codec(Codec::Utf8);
+ let dt = AvroDataType::new(
+ Codec::RunEndEncoded(Arc::new(inner), 4),
+ Default::default(),
+ None,
+ );
+ let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
+ let arr = dec.flush(None).expect("flush");
+ let ra = arr
+ .as_any()
+ .downcast_ref::<RunArray<Int32Type>>()
+ .expect("RunArray<Int32Type>");
+ assert_eq!(ra.len(), 0);
+ assert_eq!(ra.run_ends().len(), 0);
+ assert_eq!(ra.values().len(), 0);
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_strings_grouping_width32_bits() {
+ use arrow_array::RunArray;
+ use std::sync::Arc;
+ let inner = avro_from_codec(Codec::Utf8);
+ let dt = AvroDataType::new(
+ Codec::RunEndEncoded(Arc::new(inner), 32),
+ Default::default(),
+ None,
+ );
+ let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
+ for s in ["a", "a", "bb", "bb", "bb", "a"] {
+ let bytes = encode_avro_bytes(s.as_bytes());
+ dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+ }
+ let arr = dec.flush(None).expect("flush");
+ let ra = arr
+ .as_any()
+ .downcast_ref::<RunArray<Int32Type>>()
+ .expect("RunArray<Int32Type>");
+ assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
+ let vals = ra
+ .values()
+ .as_ref()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("values String");
+ assert_eq!(vals.len(), 3);
+ assert_eq!(vals.value(0), "a");
+ assert_eq!(vals.value(1), "bb");
+ assert_eq!(vals.value(2), "a");
+ }
+
+ #[cfg(not(feature = "avro_custom_types"))]
+ #[test]
+ fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
+ let dt = avro_from_codec(Codec::Int32);
+ let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
+ for v in [1, 2, 3] {
+ let bytes = encode_avro_int(v);
+ dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
+ }
+ let arr = dec.flush(None).expect("flush");
+ let a = arr
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Int32Array");
+ assert_eq!(a.values(), &[1, 2, 3]);
+ }
}
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index cce9a7d1f8..cff8ee1937 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+//! Avro Schema representations for Arrow.
+
#[cfg(feature = "canonical_extension_types")]
use arrow_schema::extension::ExtensionType;
use arrow_schema::{
@@ -1525,14 +1527,46 @@ fn datatype_to_avro(
)?
}
}
- DataType::RunEndEncoded(_, values) => process_datatype(
- values.data_type(),
- values.name(),
- values.metadata(),
- name_gen,
- null_order,
- false,
- )?,
+ #[cfg(feature = "avro_custom_types")]
+ DataType::RunEndEncoded(run_ends, values) => {
+ let bits = match run_ends.data_type() {
+ DataType::Int16 => 16,
+ DataType::Int32 => 32,
+ DataType::Int64 => 64,
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "RunEndEncoded requires Int16/Int32/Int64 for
run_ends, found: {other:?}"
+ )));
+ }
+ };
+ // Build the value site schema, preserving its own nullability
+ let (value_schema, value_extras) = datatype_to_avro(
+ values.data_type(),
+ values.name(),
+ values.metadata(),
+ name_gen,
+ null_order,
+ )?;
+ let mut merged = merge_extras(value_schema, value_extras);
+ if values.is_nullable() {
+ merged = wrap_nullable(merged, null_order);
+ }
+ let mut extras = JsonMap::new();
+ extras.insert("logicalType".into(),
json!("arrow.run-end-encoded"));
+ extras.insert("arrow.runEndIndexBits".into(), json!(bits));
+ return Ok((merged, extras));
+ }
+ #[cfg(not(feature = "avro_custom_types"))]
+ DataType::RunEndEncoded(_run_ends, values) => {
+ let (value_schema, _extras) = datatype_to_avro(
+ values.data_type(),
+ values.name(),
+ values.metadata(),
+ name_gen,
+ null_order,
+ )?;
+ return Ok((value_schema, JsonMap::new()));
+ }
DataType::Union(fields, mode) => {
let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 0fa217e894..5ddb798846 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -20,20 +20,22 @@
use crate::codec::{AvroDataType, AvroField, Codec};
use crate::schema::{Fingerprint, Nullability, Prefix};
use arrow_array::cast::AsArray;
+use arrow_array::types::RunEndIndexType;
use arrow_array::types::{
ArrowPrimitiveType, Date32Type, DurationMicrosecondType,
DurationMillisecondType,
- DurationNanosecondType, DurationSecondType, Float32Type, Float64Type,
Int32Type, Int64Type,
- IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType,
- Time64MicrosecondType, TimestampMicrosecondType, TimestampMillisecondType,
+ DurationNanosecondType, DurationSecondType, Float32Type, Float64Type,
Int16Type, Int32Type,
+ Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType,
IntervalYearMonthType,
+ Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
+ TimestampMillisecondType,
};
use arrow_array::{
Array, Decimal128Array, Decimal256Array, DictionaryArray,
FixedSizeBinaryArray,
GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray,
ListArray, MapArray,
- OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
UnionArray,
+ OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
StructArray, UnionArray,
};
#[cfg(feature = "small_decimals")]
use arrow_array::{Decimal32Array, Decimal64Array};
-use arrow_buffer::NullBuffer;
+use arrow_buffer::{ArrowNativeType, NullBuffer};
use arrow_schema::{
ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema,
TimeUnit, UnionMode,
};
@@ -423,7 +425,6 @@ impl<'a> FieldEncoder<'a> {
.ok_or_else(|| {
ArrowError::SchemaError("Expected
DictionaryArray<Int32>".into())
})?;
-
let values = dict
.values()
.as_any()
@@ -464,6 +465,67 @@ impl<'a> FieldEncoder<'a> {
Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
}
+ FieldPlan::RunEndEncoded {
+ values_nullability,
+ value_plan,
+ } => {
+ let dt = array.data_type();
+ let values_field = match dt {
+ DataType::RunEndEncoded(_re_field, v_field) =>
v_field.as_ref(),
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro RunEndEncoded site requires Arrow
DataType::RunEndEncoded, found: {other:?}"
+ )));
+ }
+ };
+ // Helper closure to build a typed RunEncodedEncoder<R>
+ let build = |run_arr_any: &'a dyn Array| ->
Result<Encoder<'a>, ArrowError> {
+ if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
+ let values_enc = prepare_value_site_encoder(
+ arr.values().as_ref(),
+ values_field,
+ *values_nullability,
+ value_plan.as_ref(),
+ )?;
+ return
Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
+ Int16Type,
+ >::new(
+ arr, values_enc
+ ))));
+ }
+ if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
+ let values_enc = prepare_value_site_encoder(
+ arr.values().as_ref(),
+ values_field,
+ *values_nullability,
+ value_plan.as_ref(),
+ )?;
+ return
Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
+ Int32Type,
+ >::new(
+ arr, values_enc
+ ))));
+ }
+ if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
+ let values_enc = prepare_value_site_encoder(
+ arr.values().as_ref(),
+ values_field,
+ *values_nullability,
+ value_plan.as_ref(),
+ )?;
+ return
Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
+ Int64Type,
+ >::new(
+ arr, values_enc
+ ))));
+ }
+ Err(ArrowError::SchemaError(
+ "Unsupported run-ends index type for RunEndEncoded;
expected Int16/Int32/Int64"
+ .into(),
+ ))
+ };
+ build(array)?
+ }
};
// Compute the effective null state from writer-declared nullability
and data nulls.
let null_state = match (nullability, array.null_count() > 0) {
@@ -545,6 +607,12 @@ enum FieldPlan {
Enum { symbols: Arc<[String]> },
/// Avro union, maps to Arrow Union.
Union { bindings: Vec<FieldBinding> },
+ /// Avro RunEndEncoded site. Values are encoded per logical row by mapping
the
+ /// row index to its containing run and emitting that run's value with
`value_plan`.
+ RunEndEncoded {
+ values_nullability: Option<Nullability>,
+ value_plan: Box<FieldPlan>,
+ },
}
#[derive(Debug, Clone)]
@@ -638,10 +706,17 @@ impl RecordEncoder {
ArrowError::SchemaError(format!("Column index {arrow_index}
out of range"))
})?;
let field = fields[arrow_index].as_ref();
+ #[cfg(not(feature = "avro_custom_types"))]
+ let site_nullability = match &col_plan.plan {
+ FieldPlan::RunEndEncoded { .. } => None,
+ _ => col_plan.nullability,
+ };
+ #[cfg(feature = "avro_custom_types")]
+ let site_nullability = col_plan.nullability;
let encoder = prepare_value_site_encoder(
array.as_ref(),
field,
- col_plan.nullability,
+ site_nullability,
&col_plan.plan,
)?;
out.push(encoder);
@@ -694,6 +769,25 @@ fn find_map_value_field_index(fields:
&arrow_schema::Fields) -> Option<usize> {
impl FieldPlan {
fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self,
ArrowError> {
+ #[cfg(not(feature = "avro_custom_types"))]
+ if let DataType::RunEndEncoded(_re_field, values_field) =
arrow_field.data_type() {
+ let values_nullability = avro_dt.nullability();
+ let value_site_dt: &AvroDataType = match avro_dt.codec() {
+ Codec::Union(branches, _, _) => branches
+ .iter()
+ .find(|b| !matches!(b.codec(), Codec::Null))
+ .ok_or_else(|| {
+ ArrowError::SchemaError(
+ "Avro union at RunEndEncoded site has no non-null
branch".into(),
+ )
+ })?,
+ _ => avro_dt,
+ };
+ return Ok(FieldPlan::RunEndEncoded {
+ values_nullability,
+ value_plan: Box::new(FieldPlan::build(value_site_dt,
values_field.as_ref())?),
+ });
+ }
if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
// Extension-based detection (only when the feature is enabled)
let ext_is_uuid = {
@@ -860,7 +954,6 @@ impl FieldPlan {
)));
}
};
-
if avro_branches.len() != arrow_union_fields.len() {
return Err(ArrowError::SchemaError(format!(
"Mismatched number of branches between Avro union ({})
and Arrow union ({}) for field '{}'",
@@ -869,7 +962,6 @@ impl FieldPlan {
arrow_field.name()
)));
}
-
let bindings = avro_branches
.iter()
.zip(arrow_union_fields.iter())
@@ -882,12 +974,26 @@ impl FieldPlan {
})
})
.collect::<Result<Vec<_>, ArrowError>>()?;
-
Ok(FieldPlan::Union { bindings })
}
Codec::Union(_, _, UnionMode::Sparse) =>
Err(ArrowError::NotYetImplemented(
"Sparse Arrow unions are not yet supported".to_string(),
)),
+ #[cfg(feature = "avro_custom_types")]
+ Codec::RunEndEncoded(values_dt, _width_code) => {
+ let values_field = match arrow_field.data_type() {
+ DataType::RunEndEncoded(_run_ends_field, values_field) =>
values_field.as_ref(),
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro RunEndEncoded maps to Arrow
DataType::RunEndEncoded, found: {other:?}"
+ )));
+ }
+ };
+ Ok(FieldPlan::RunEndEncoded {
+ values_nullability: values_dt.nullability(),
+ value_plan: Box::new(FieldPlan::build(values_dt.as_ref(),
values_field)?),
+ })
+ }
_ => Ok(FieldPlan::Scalar),
}
}
@@ -935,6 +1041,10 @@ enum Encoder<'a> {
Enum(EnumEncoder<'a>),
Map(Box<MapEncoder<'a>>),
Union(Box<UnionEncoder<'a>>),
+ /// Run-end encoded values with specific run-end index widths
+ RunEncoded16(Box<RunEncodedEncoder16<'a>>),
+ RunEncoded32(Box<RunEncodedEncoder32<'a>>),
+ RunEncoded64(Box<RunEncodedEncoder64<'a>>),
Null,
}
@@ -977,6 +1087,9 @@ impl<'a> Encoder<'a> {
Encoder::Map(e) => (e).encode(out, idx),
Encoder::Enum(e) => (e).encode(out, idx),
Encoder::Union(e) => (e).encode(out, idx),
+ Encoder::RunEncoded16(e) => (e).encode(out, idx),
+ Encoder::RunEncoded32(e) => (e).encode(out, idx),
+ Encoder::RunEncoded64(e) => (e).encode(out, idx),
Encoder::Null => Ok(()),
}
}
@@ -1460,6 +1573,7 @@ impl IntervalToDurationParts for IntervalDayTimeType {
})
}
}
+
/// Single generic encoder used for all three interval units.
/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
struct DurationEncoder<'a, P: ArrowPrimitiveType +
IntervalToDurationParts>(&'a PrimitiveArray<P>);
@@ -1554,6 +1668,72 @@ type Decimal64Encoder<'a> = DecimalEncoder<'a, 8,
Decimal64Array>;
type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
+/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
+/// Follows the pattern used by other generic encoders (i.e.,
`ListEncoder<O>`),
+/// avoiding runtime branching on run-end width.
+struct RunEncodedEncoder<'a, R: RunEndIndexType> {
+ ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
+ base: usize,
+ len: usize,
+ values: FieldEncoder<'a>,
+ // Cached run index used for sequential scans of rows [0..n)
+ cur_run: usize,
+ // Cached end (logical index, 1-based per spec) for the current run.
+ cur_end: usize,
+}
+
+type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
+type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
+type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
+
+impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
+ fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
+ let ends = arr.run_ends();
+ let base = ends.get_start_physical_index();
+ let slice = ends.values();
+ let len = ends.len();
+ let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
+ Self {
+ ends_slice: slice,
+ base,
+ len,
+ values,
+ cur_run: 0,
+ cur_end,
+ }
+ }
+
+ /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
+ /// Uses the REE invariant: run ends are strictly increasing, positive,
and 1-based.
+ #[inline(always)]
+ fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
+ if idx < self.cur_end {
+ return Ok(());
+ }
+ // Move forward across run boundaries until idx falls within cur_end
+ while self.cur_run + 1 < self.len && idx >= self.cur_end {
+ self.cur_run += 1;
+ self.cur_end = self.ends_slice[self.base +
self.cur_run].as_usize();
+ }
+ if idx < self.cur_end {
+ Ok(())
+ } else {
+ Err(ArrowError::InvalidArgumentError(format!(
+ "row index {idx} out of bounds for run-ends ({} runs)",
+ self.len
+ )))
+ }
+ }
+
+ #[inline(always)]
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ self.advance_to_row(idx)?;
+ // For REE values, the value for any logical row within a run is at
+ // the physical index of that run.
+ self.values.encode(out, self.cur_run)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 6d5ace2230..c1ec61c7fa 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -395,9 +395,13 @@ mod tests {
use crate::reader::ReaderBuilder;
use crate::schema::{AvroSchema, SchemaStore};
use crate::test_util::arrow_test_data;
+ #[cfg(feature = "avro_custom_types")]
+ use arrow_array::types::{Int16Type, Int32Type, Int64Type};
use arrow_array::{
Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StructArray,
UnionArray,
};
+ #[cfg(feature = "avro_custom_types")]
+ use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
#[cfg(not(feature = "avro_custom_types"))]
use arrow_schema::{DataType, Field, Schema};
#[cfg(feature = "avro_custom_types")]
@@ -1452,4 +1456,407 @@ mod tests {
assert_eq!(round_trip, input);
Ok(())
}
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
+ let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
+ let run_values = Int32Array::from(vec![Some(1), Some(2), None,
Some(3)]);
+ let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
+ let field = Field::new("x", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ree.clone()) as ArrayRef],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 8);
+ match out.schema().field(0).data_type() {
+ DataType::RunEndEncoded(run_ends_field, values_field) => {
+ assert_eq!(run_ends_field.name(), "run_ends");
+ assert_eq!(run_ends_field.data_type(), &DataType::Int32);
+ assert_eq!(values_field.name(), "values");
+ assert_eq!(values_field.data_type(), &DataType::Int32);
+ assert!(values_field.is_nullable());
+ let got_ree = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<RunArray<Int32Type>>()
+ .expect("RunArray<Int32Type>");
+ assert_eq!(got_ree, &ree);
+ }
+ other => panic!(
+ "Unexpected DataType for round-tripped RunEndEncoded column:
{:?}",
+ other
+ ),
+ }
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() ->
Result<(), ArrowError>
+ {
+ let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
+ let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
+ let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
+ let field = Field::new("s", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ree.clone()) as ArrayRef],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 7);
+ match out.schema().field(0).data_type() {
+ DataType::RunEndEncoded(run_ends_field, values_field) => {
+ assert_eq!(run_ends_field.data_type(), &DataType::Int16);
+ assert_eq!(values_field.data_type(), &DataType::Utf8);
+ assert!(
+ values_field.is_nullable(),
+ "REE 'values' child should be nullable"
+ );
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<RunArray<Int16Type>>()
+ .expect("RunArray<Int16Type>");
+ assert_eq!(got, &ree);
+ }
+ other => panic!("Unexpected DataType: {:?}", other),
+ }
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
+ -> Result<(), ArrowError> {
+ let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
+ let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
+ let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
+ let field = Field::new("y", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ree.clone()) as ArrayRef],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 8);
+ match out.schema().field(0).data_type() {
+ DataType::RunEndEncoded(run_ends_field, values_field) => {
+ assert_eq!(run_ends_field.data_type(), &DataType::Int64);
+ assert_eq!(values_field.data_type(), &DataType::Int32);
+ assert!(values_field.is_nullable());
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<RunArray<Int64Type>>()
+ .expect("RunArray<Int64Type>");
+ assert_eq!(got, &ree);
+ }
+ other => panic!("Unexpected DataType for REE column: {:?}", other),
+ }
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(),
ArrowError> {
+ let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
+ let run_values = Int32Array::from(vec![Some(1), Some(2), None,
Some(3)]);
+ let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
+ let offset = 1usize;
+ let length = 6usize;
+ let base_values = base
+ .values()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("REE values as Int32Array");
+ let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
+ for i in offset..offset + length {
+ let phys = base.get_physical_index(i);
+ let v = if base_values.is_null(phys) {
+ None
+ } else {
+ Some(base_values.value(phys))
+ };
+ logical_window.push(v);
+ }
+
+ fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array,
Int32Array) {
+ if vals.is_empty() {
+ return (Int32Array::new_null(0), Int32Array::new_null(0));
+ }
+ let mut run_ends_out: Vec<i32> = Vec::new();
+ let mut run_vals_out: Vec<Option<i32>> = Vec::new();
+ let mut cur = vals[0];
+ let mut len = 1i32;
+ for v in &vals[1..] {
+ if *v == cur {
+ len += 1;
+ } else {
+ let last_end = run_ends_out.last().copied().unwrap_or(0);
+ run_ends_out.push(last_end + len);
+ run_vals_out.push(cur);
+ cur = *v;
+ len = 1;
+ }
+ }
+ let last_end = run_ends_out.last().copied().unwrap_or(0);
+ run_ends_out.push(last_end + len);
+ run_vals_out.push(cur);
+ (
+ Int32Array::from(run_ends_out),
+ Int32Array::from(run_vals_out),
+ )
+ }
+ let (owned_run_ends, owned_run_values) =
compress_run_ends_i32(&logical_window);
+ let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends,
&owned_run_values)?;
+ let field = Field::new("x", owned_slice.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(owned_slice.clone()) as ArrayRef],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), length);
+ match out.schema().field(0).data_type() {
+ DataType::RunEndEncoded(run_ends_field, values_field) => {
+ assert_eq!(run_ends_field.data_type(), &DataType::Int32);
+ assert_eq!(values_field.data_type(), &DataType::Int32);
+ assert!(values_field.is_nullable());
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<RunArray<Int32Type>>()
+ .expect("RunArray<Int32Type>");
+ fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
+ let vals = a
+ .values()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("REE values as Int32Array");
+ let mut out: Vec<Option<i32>> =
Vec::with_capacity(a.len());
+ for i in 0..a.len() {
+ let phys = a.get_physical_index(i);
+ out.push(if vals.is_null(phys) {
+ None
+ } else {
+ Some(vals.value(phys))
+ });
+ }
+ Int32Array::from(out)
+ }
+ let got_logical = expand_ree_to_int32(got);
+ let expected_logical = Int32Array::from(logical_window);
+ assert_eq!(
+ got_logical, expected_logical,
+ "Logical values differ after REE slice round-trip"
+ );
+ }
+ other => panic!("Unexpected DataType for REE column: {:?}", other),
+ }
+ Ok(())
+ }
+
+ #[cfg(not(feature = "avro_custom_types"))]
+ #[test]
+ fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(),
ArrowError> {
+ use arrow_schema::{DataType, Field, Schema};
+ let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
+ let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2),
None, Some(3)]);
+ let ree =
arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
+ &run_ends,
+ &run_values,
+ )?;
+ let field = Field::new("x", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree)
as ArrayRef])?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 8);
+ assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Int32Array");
+ let expected = Int32Array::from(vec![
+ Some(1),
+ Some(1),
+ Some(1),
+ Some(2),
+ Some(2),
+ None,
+ None,
+ Some(3),
+ ]);
+ assert_eq!(got, &expected);
+ Ok(())
+ }
+
+ #[cfg(not(feature = "avro_custom_types"))]
+ #[test]
+ fn
test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
+ -> Result<(), ArrowError> {
+ use arrow_schema::{DataType, Field, Schema};
+ let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
+ let run_values = arrow_array::StringArray::from(vec![Some("a"), None,
Some("c")]);
+ let ree =
arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
+ &run_ends,
+ &run_values,
+ )?;
+ let field = Field::new("s", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree)
as ArrayRef])?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 7);
+ assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow_array::StringArray>()
+ .expect("StringArray");
+ let expected = arrow_array::StringArray::from(vec![
+ Some("a"),
+ Some("a"),
+ None,
+ None,
+ None,
+ Some("c"),
+ Some("c"),
+ ]);
+ assert_eq!(got, &expected);
+ Ok(())
+ }
+
+ #[cfg(not(feature = "avro_custom_types"))]
+ #[test]
+ fn
test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
+ -> Result<(), ArrowError> {
+ use arrow_schema::{DataType, Field, Schema};
+ let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
+ let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
+ let ree =
arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
+ &run_ends,
+ &run_values,
+ )?;
+ let field = Field::new("y", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree)
as ArrayRef])?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 8);
+ assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Int32Array");
+ let expected = Int32Array::from(vec![
+ Some(999),
+ Some(999),
+ Some(999),
+ Some(999),
+ Some(-5),
+ Some(-5),
+ Some(-5),
+ Some(-5),
+ ]);
+ assert_eq!(got, &expected);
+ Ok(())
+ }
+
+ #[cfg(not(feature = "avro_custom_types"))]
+ #[test]
+ fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() ->
Result<(), ArrowError> {
+ use arrow_schema::{DataType, Field, Schema};
+ let run_ends = Int32Array::from(vec![2, 4, 6]);
+ let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
+ let ree =
arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
+ &run_ends,
+ &run_values,
+ )?;
+ let field = Field::new("x", ree.data_type().clone(), true);
+ let schema = Schema::new(vec![field]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree)
as ArrayRef])?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let out = arrow::compute::concat_batches(&out_schema,
&batches).expect("concat output");
+ assert_eq!(out.num_columns(), 1);
+ assert_eq!(out.num_rows(), 6);
+ assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
+ let got = out
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Int32Array");
+ let expected = Int32Array::from(vec![Some(1), Some(1), Some(2),
Some(2), None, None]);
+ assert_eq!(got, &expected);
+ Ok(())
+ }
}