This is an automated email from the ASF dual-hosted git repository.

Jefffrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new be664614ab Support writing REE arrays directly to Parquet (#10064)
be664614ab is described below

commit be664614ab684ec34809361f7e868cc8c3919552
Author: RIchard Baah <[email protected]>
AuthorDate: Fri Jun 12 22:23:09 2026 -0400

    Support writing REE arrays directly to Parquet (#10064)
    
    # Which issue does this PR close?
    This PR works towards an initial solution closing #8016
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Closes #8016.
    
    # Rationale for this change
    Currently `arrow_writer` does not support writing Run End Encoded
    columns out to parquet. This PR works towards solving this by first
    expanding out the REE to its value type & then writing out to parquet.
    Once its possible to write REE to parquet we can work on optimizing it
    by keeping the compacting nature in tact.
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    # What changes are included in this PR?
    `arrow_writer()` now supports writing Run End Encoded (REE) arrays to
    Parquet by hydrating them to their underlying value type before
    encoding. This is an initial, correctness-first implementation. A
    follow-up can/should optimize to preserve the compacted structure.
    
    **parquet/src/arrow/arrow_writer/mod.rs**: generate a value-type
    arrow-column writer & test
    **parquet/src/arrow/arrow_writer/levels.rs**: core writer logic updated
    to detect REE columns and expand them to their flat value type before
    the existing write path.
    **parquet/src/arrow/schema/mod.rs**: schema conversion updated to map
    RunEndEncodedType to an appropriate Parquet physical type.
    **parquet/benches/arrow_writer.rs**: REE write benchmarks added with low
    and high null density scenarios, now unblocked by the implementation.
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    # Are these changes tested?
    Yes
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    
    If this PR claims a performance improvement, please include evidence
    such as benchmark results.
    -->
    
    # Are there any user-facing changes?
    Users will be able to write out their REE columns out to parquet using
    `arrow_writer`
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
---
 parquet/benches/arrow_writer.rs          |  27 +++-
 parquet/src/arrow/arrow_writer/levels.rs |  34 ++++-
 parquet/src/arrow/arrow_writer/mod.rs    | 237 +++++++++++++++++++++++++++++++
 parquet/src/arrow/schema/mod.rs          |  34 ++++-
 4 files changed, 321 insertions(+), 11 deletions(-)

diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index 6b09fd4a2f..073716e2f3 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -159,14 +159,14 @@ fn create_string_dictionary_bench_batch(
         true_density,
     )?)
 }
-// commenting out until implementation of RunEndEncoded is complete. See 
https://github.com/apache/arrow-rs/pull/9936#discussion_r3242936421
-#[allow(dead_code)]
 fn create_ree_bench_batch(
     value_dt: DataType,
     size: usize,
-    null_density: f32,
+    null_pct: Option<u8>,
     true_density: f32,
 ) -> Result<RecordBatch> {
+    const DEFAULT_NULL_PCT: u8 = 10;
+    let null_density = null_pct.unwrap_or(DEFAULT_NULL_PCT) as f32 / 100.0;
     let fields = vec![Field::new(
         "_1",
         DataType::RunEndEncoded(
@@ -506,11 +506,24 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> {
     let batch = create_string_bench_batch_non_null(BATCH_SIZE, 0.25, 
0.75).unwrap();
     batches.push(("string_non_null", batch));
 
-    //let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, 0.25, 
0.75).unwrap();
-    //batches.push(("string_ree", batch));
+    let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, None, 
0.75).unwrap();
+    batches.push(("string_ree", batch));
+
+    let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, None, 
0.75).unwrap();
+    batches.push(("int32_ree", batch));
+
+    let batch = create_ree_bench_batch(DataType::Boolean, BATCH_SIZE, None, 
0.75).unwrap();
+    batches.push(("bool_ree", batch));
+
+    let batch =
+        create_ree_bench_batch(DataType::FixedSizeBinary(16), BATCH_SIZE, 
None, 0.75).unwrap();
+    batches.push(("fixed_size_binary_ree", batch));
+
+    let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, Some(95), 
0.75).unwrap();
+    batches.push(("string_ree_95pct_null", batch));
 
-    //let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, 0.25, 
0.75).unwrap();
-    //batches.push(("int32_ree", batch));
+    let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, Some(95), 
0.75).unwrap();
+    batches.push(("int32_ree_95pct_null", batch));
 
     let batch = create_float_bench_batch_with_nans(BATCH_SIZE, 0.5).unwrap();
     batches.push(("float_with_nans", batch));
diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index e577cf73a8..8abdbcc77d 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -44,13 +44,36 @@ use crate::column::chunker::CdcChunk;
 use crate::column::writer::LevelDataRef;
 use crate::errors::{ParquetError, Result};
 use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, ArrayRef, Int32Array, OffsetSizeTrait, RunArray, 
downcast_run_array};
 use arrow_buffer::bit_iterator::BitIndexIterator;
 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
 use arrow_schema::{DataType, Field};
 use std::ops::Range;
 use std::sync::Arc;
 
+/// Expands a [`DataType::RunEndEncoded`] array into a flat (logical) array of 
its values type.
+///
+/// use `arrow_select::take` to materialize the  full-length flat array.
+/// This is intentionally simple (O(n)); efficiency can/should be improved
+fn expand_ree_array(array: &ArrayRef) -> Result<ArrayRef> {
+    downcast_run_array!(
+        array => expand_typed_ree(array),
+        _ => unreachable!("expand_ree_array called on non-REE array"),
+    )
+}
+
+fn expand_typed_ree<R: RunEndIndexType>(run_array: &RunArray<R>) -> 
Result<ArrayRef> {
+    let run_ends = run_array.run_ends();
+    let values = run_array.values();
+    let len = run_array.len();
+    let indices: Int32Array = (0..len)
+        .map(|i| run_ends.get_physical_index(i) as i32)
+        .collect();
+    arrow_select::take::take(values.as_ref(), &indices, None)
+        .map_err(|e| arrow_err!("Failed to expand REE array: {}", e))
+}
+
 /// Performs a depth-first scan of the children of `array`, constructing 
[`ArrayLevels`]
 /// for each leaf column encountered
 pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> 
Result<Vec<ArrayLevels>> {
@@ -185,6 +208,15 @@ impl LevelInfoBuilder {
                 let levels = ArrayLevels::new(parent_ctx, is_nullable, 
array.clone());
                 Ok(Self::Primitive(levels))
             }
+            DataType::RunEndEncoded(_, value_field) => {
+                let flat = expand_ree_array(array)?;
+                let flat_field = Field::new(
+                    field.name(),
+                    value_field.data_type().clone(),
+                    field.is_nullable(),
+                );
+                Self::try_new(&flat_field, parent_ctx, &flat)
+            }
             DataType::Struct(children) => {
                 let array = array.as_struct();
                 let def_level = match is_nullable {
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 9e61c57514..19c3a8f76a 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1503,6 +1503,9 @@ impl ArrowColumnWriterFactory {
                 ArrowDataType::FixedSizeBinary(_) => 
out.push(bytes(leaves.next().unwrap())?),
                 _ => out.push(col(leaves.next().unwrap())?),
             },
+            ArrowDataType::RunEndEncoded(_, value_field) => {
+                self.get_arrow_column_writer(value_field.data_type(), props, 
leaves, out)?
+            }
             _ => {
                 return Err(ParquetError::NYI(format!(
                     "Attempting to write an Arrow type {data_type} to parquet 
that is not yet implemented"
@@ -5664,4 +5667,238 @@ mod tests {
         let cc = file_meta.row_group(0).column(0);
         assert!(cc.column_index_range().is_none());
     }
+
+    /// Writes a single-column RecordBatch to an in-memory Parquet buffer.
+    fn write_column_to_bytes(array: ArrayRef) -> Bytes {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            array.data_type().clone(),
+            true,
+        )]));
+        let buf = get_bytes_after_close(
+            schema.clone(),
+            &RecordBatch::try_new(schema, vec![array]).unwrap(),
+        );
+        Bytes::from(buf)
+    }
+
+    /// Reads column 0 from a single-row-group Parquet buffer, projecting it 
with the given schema.
+    /// Passing a flat schema when the buffer was written from a REE array 
lets callers decode
+    /// the physical values without the run-end encoding wrapper.
+    fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
+        let opts = 
crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
+        ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
+            .unwrap()
+            .build()
+            .unwrap()
+            .next()
+            .unwrap()
+            .unwrap()
+            .column(0)
+            .clone()
+    }
+
+    fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
+        let flat_schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            flat.data_type().clone(),
+            true,
+        )]));
+        let ree_bytes = write_column_to_bytes(ree);
+        let flat_bytes = write_column_to_bytes(flat.clone());
+        assert_eq!(
+            ree_bytes, flat_bytes,
+            "REE and flat bytes should be identical"
+        );
+
+        let decoded_ree = read_column_with_schema(ree_bytes, 
flat_schema.clone());
+        let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
+
+        assert_eq!(decoded_ree.as_ref(), flat.as_ref());
+        assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
+    }
+
+    #[test]
+    fn ree_string() {
+        let ree: ArrayRef = Arc::new(
+            [Some("a"), Some("a"), None, Some("b"), Some("b")]
+                .into_iter()
+                .collect::<Int32RunArray>(),
+        );
+        let flat: ArrayRef = Arc::new(StringArray::from(vec![
+            Some("a"),
+            Some("a"),
+            None,
+            Some("b"),
+            Some("b"),
+        ]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_int32() {
+        let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
+        for v in [Some(1), Some(1), None, Some(2), Some(2)] {
+            b.append_option(v);
+        }
+        let ree: ArrayRef = Arc::new(b.finish());
+        let flat: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            Some(1),
+            None,
+            Some(2),
+            Some(2),
+        ]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_bool() {
+        // run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
+        let ree: ArrayRef = Arc::new(
+            RunArray::try_new(
+                &Int32Array::from(vec![3, 5, 7]),
+                &BooleanArray::from(vec![Some(true), None, Some(false)]),
+            )
+            .unwrap(),
+        );
+        let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
+            Some(true),
+            Some(true),
+            Some(true),
+            None,
+            None,
+            Some(false),
+            Some(false),
+        ]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_fixed_size_binary() {
+        let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
+            let mut b = FixedSizeBinaryBuilder::new(2);
+            for v in vals {
+                match v {
+                    Some(x) => b.append_value(x).unwrap(),
+                    None => b.append_null(),
+                }
+            }
+            b.finish()
+        };
+        // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
+        let ree: ArrayRef = Arc::new(
+            RunArray::try_new(
+                &Int32Array::from(vec![2, 4, 6]),
+                &mk(&[Some(b"aa"), None, Some(b"bb")]),
+            )
+            .unwrap(),
+        );
+        let flat: ArrayRef = Arc::new(mk(&[
+            Some(b"aa"),
+            Some(b"aa"),
+            None,
+            None,
+            Some(b"bb"),
+            Some(b"bb"),
+        ]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_single_run() {
+        let ree: ArrayRef = Arc::new(["x", "x", 
"x"].into_iter().collect::<Int32RunArray>());
+        let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_float32() {
+        // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
+        let ree: ArrayRef = Arc::new(
+            RunArray::try_new(
+                &Int32Array::from(vec![2, 4, 5]),
+                &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
+            )
+            .unwrap(),
+        );
+        let flat: ArrayRef = Arc::new(Float32Array::from(vec![
+            Some(1.0_f32),
+            Some(1.0_f32),
+            None,
+            None,
+            Some(2.5_f32),
+        ]));
+        ree_write_read_roundtrip(ree, flat);
+    }
+
+    #[test]
+    fn ree_sliced() {
+        // A sliced (non-zero offset) REE array: verify that get_physical_index
+        // correctly accounts for the logical offset when expanding.
+        // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
+        // After slice(2, 5) the logical view is [a, b, b, c, c].
+        let full: ArrayRef = Arc::new(
+            RunArray::try_new(
+                &Int32Array::from(vec![3, 5, 7]),
+                &StringArray::from(vec!["a", "b", "c"]),
+            )
+            .unwrap(),
+        );
+        let sliced = full.slice(2, 5);
+        let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", 
"c", "c"]));
+        ree_write_read_roundtrip(sliced, flat);
+    }
+
+    #[test]
+    fn ree_struct_with_ree_child() {
+        // Struct with a REE string field and a REE int field — confirms
+        // recursion visits every child and each collapses to the right leaf 
type.
+        let run_ends = Int32Array::from(vec![2i32, 3, 5]);
+
+        let col_a: ArrayRef = Arc::new(
+            RunArray::try_new(
+                &run_ends,
+                &StringArray::from(vec![Some("foo"), None, Some("bar")]),
+            )
+            .unwrap(),
+        );
+        let col_b: ArrayRef = Arc::new(
+            RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, 
Some(2)])).unwrap(),
+        );
+
+        let struct_array: ArrayRef = Arc::new(StructArray::new(
+            Fields::from(vec![
+                Field::new("a", col_a.data_type().clone(), true),
+                Field::new("b", col_b.data_type().clone(), true),
+            ]),
+            vec![col_a, col_b],
+            None,
+        ));
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "row",
+            struct_array.data_type().clone(),
+            true,
+        )]));
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![struct_array]).unwrap();
+
+        let mut buf = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
+        writer.write(&batch).unwrap();
+        let metadata = writer.close().unwrap();
+
+        let parquet_schema = metadata.file_metadata().schema_descr();
+        assert_eq!(parquet_schema.num_columns(), 2);
+        assert_eq!(
+            parquet_schema.column(0).physical_type(),
+            crate::basic::Type::BYTE_ARRAY
+        );
+        assert_eq!(parquet_schema.column(0).path().string(), "row.a");
+        assert_eq!(
+            parquet_schema.column(1).physical_type(),
+            crate::basic::Type::INT32
+        );
+        assert_eq!(parquet_schema.column(1).path().string(), "row.b");
+    }
 }
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 7fe6fbc9d9..7878e7c49b 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -311,6 +311,15 @@ pub fn encode_arrow_schema(schema: &Schema) -> String {
     BASE64_STANDARD.encode(&len_prefix_schema)
 }
 
+fn flatten_ree_field(field: &Field) -> Field {
+    match field.data_type() {
+        DataType::RunEndEncoded(_, value_field) => field
+            .clone()
+            .with_data_type(value_field.data_type().clone()),
+        _ => field.clone(),
+    }
+}
+
 /// Mutates writer metadata by storing the encoded Arrow schema hint in
 /// [`ARROW_SCHEMA_META_KEY`].
 ///
@@ -318,6 +327,22 @@ pub fn encode_arrow_schema(schema: &Schema) -> String {
 ///
 /// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
 pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut 
WriterProperties) {
+    let has_ree = schema
+        .fields()
+        .iter()
+        .any(|f| matches!(f.data_type(), DataType::RunEndEncoded(_, _)));
+    let flat_schema;
+    let schema = if has_ree {
+        let flat_fields: Vec<Field> = schema
+            .fields()
+            .iter()
+            .map(|f| flatten_ree_field(f))
+            .collect();
+        flat_schema = Schema::new_with_metadata(flat_fields, 
schema.metadata().clone());
+        &flat_schema
+    } else {
+        schema
+    };
     let encoded = encode_arrow_schema(schema);
 
     let schema_kv = KeyValue {
@@ -833,9 +858,12 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: 
bool) -> Result<Type> {
             let dict_field = 
field.clone().with_data_type(value.as_ref().clone());
             arrow_to_parquet_type(&dict_field, coerce_types)
         }
-        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
-            "Converting RunEndEncodedType to parquet not supported",
-        )),
+        DataType::RunEndEncoded(_, value_field) => {
+            let ree_value_field = field
+                .clone()
+                .with_data_type(value_field.data_type().clone());
+            arrow_to_parquet_type(&ree_value_field, coerce_types)
+        }
     }
 }
 

Reply via email to