This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ac0053772 Support Encoding Parquet Columns in Parallel (#4871)
3ac0053772 is described below

commit 3ac0053772660f09483d14649996f73be6d45269
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Oct 1 10:53:56 2023 +0100

    Support Encoding Parquet Columns in Parallel (#4871)
    
    * Facilitate parallel parquet writing
    
    * Revert OnCloseRowGroup Send
    
    * Add example
    
    * Review feedback
    
    * Fix doc
    
    * Further review feedback
    
    * More docs
---
 parquet/src/arrow/arrow_writer/levels.rs | 434 ++++++++++++++++++-------------
 parquet/src/arrow/arrow_writer/mod.rs    | 306 +++++++++++++++-------
 parquet/src/file/writer.rs               |   3 +-
 3 files changed, 461 insertions(+), 282 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index 48615dc3d5..4a0bd551e1 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -42,19 +42,20 @@
 
 use crate::errors::{ParquetError, Result};
 use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, 
StructArray};
-use arrow_buffer::NullBuffer;
+use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
+use arrow_buffer::{NullBuffer, OffsetBuffer};
 use arrow_schema::{DataType, Field};
 use std::ops::Range;
+use std::sync::Arc;
 
-/// Performs a depth-first scan of the children of `array`, constructing 
[`LevelInfo`]
+/// Performs a depth-first scan of the children of `array`, constructing 
[`ArrayLevels`]
 /// for each leaf column encountered
 pub(crate) fn calculate_array_levels(
     array: &ArrayRef,
     field: &Field,
-) -> Result<Vec<LevelInfo>> {
-    let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
-    builder.write(array, 0..array.len());
+) -> Result<Vec<ArrayLevels>> {
+    let mut builder = LevelInfoBuilder::try_new(field, Default::default(), 
array)?;
+    builder.write(0..array.len());
     Ok(builder.finish())
 }
 
@@ -102,31 +103,57 @@ struct LevelContext {
     def_level: i16,
 }
 
-/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
 enum LevelInfoBuilder {
     /// A primitive, leaf array
-    Primitive(LevelInfo),
-    /// A list array, contains the [`LevelInfoBuilder`] of the child and
-    /// the [`LevelContext`] of this list
-    List(Box<LevelInfoBuilder>, LevelContext),
-    /// A list array, contains the [`LevelInfoBuilder`] of its children and
-    /// the [`LevelContext`] of this struct array
-    Struct(Vec<LevelInfoBuilder>, LevelContext),
+    Primitive(ArrayLevels),
+    /// A list array
+    List(
+        Box<LevelInfoBuilder>, // Child Values
+        LevelContext,          // Context
+        OffsetBuffer<i32>,     // Offsets
+        Option<NullBuffer>,    // Nulls
+    ),
+    /// A large list array
+    LargeList(
+        Box<LevelInfoBuilder>, // Child Values
+        LevelContext,          // Context
+        OffsetBuffer<i64>,     // Offsets
+        Option<NullBuffer>,    // Nulls
+    ),
+    /// A fixed size list array
+    FixedSizeList(
+        Box<LevelInfoBuilder>, // Values
+        LevelContext,          // Context
+        usize,                 // List Size
+        Option<NullBuffer>,    // Nulls
+    ),
+    /// A struct array
+    Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
 }
 
 impl LevelInfoBuilder {
     /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent 
[`LevelContext`]
-    fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
-        match field.data_type() {
-            d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
-                parent_ctx,
-                field.is_nullable(),
-            ))),
-            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => 
Ok(Self::Primitive(
-                LevelInfo::new(parent_ctx, field.is_nullable()),
-            )),
+    fn try_new(
+        field: &Field,
+        parent_ctx: LevelContext,
+        array: &ArrayRef,
+    ) -> Result<Self> {
+        assert_eq!(field.data_type(), array.data_type());
+        let is_nullable = field.is_nullable();
+
+        match array.data_type() {
+            d if is_leaf(d) => {
+                let levels = ArrayLevels::new(parent_ctx, is_nullable, 
array.clone());
+                Ok(Self::Primitive(levels))
+            }
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+                let levels = ArrayLevels::new(parent_ctx, is_nullable, 
array.clone());
+                Ok(Self::Primitive(levels))
+            }
             DataType::Struct(children) => {
-                let def_level = match field.is_nullable() {
+                let array = array.as_struct();
+                let def_level = match is_nullable {
                     true => parent_ctx.def_level + 1,
                     false => parent_ctx.def_level,
                 };
@@ -138,16 +165,17 @@ impl LevelInfoBuilder {
 
                 let children = children
                     .iter()
-                    .map(|f| Self::try_new(f, ctx))
+                    .zip(array.columns())
+                    .map(|(f, a)| Self::try_new(f, ctx, a))
                     .collect::<Result<_>>()?;
 
-                Ok(Self::Struct(children, ctx))
+                Ok(Self::Struct(children, ctx, array.nulls().cloned()))
             }
             DataType::List(child)
             | DataType::LargeList(child)
             | DataType::Map(child, _)
             | DataType::FixedSizeList(child, _) => {
-                let def_level = match field.is_nullable() {
+                let def_level = match is_nullable {
                     true => parent_ctx.def_level + 2,
                     false => parent_ctx.def_level + 1,
                 };
@@ -157,79 +185,70 @@ impl LevelInfoBuilder {
                     def_level,
                 };
 
-                let child = Self::try_new(child.as_ref(), ctx)?;
-                Ok(Self::List(Box::new(child), ctx))
+                Ok(match field.data_type() {
+                    DataType::List(_) => {
+                        let list = array.as_list();
+                        let child = Self::try_new(child.as_ref(), ctx, 
list.values())?;
+                        let offsets = list.offsets().clone();
+                        Self::List(Box::new(child), ctx, offsets, 
list.nulls().cloned())
+                    }
+                    DataType::LargeList(_) => {
+                        let list = array.as_list();
+                        let child = Self::try_new(child.as_ref(), ctx, 
list.values())?;
+                        let offsets = list.offsets().clone();
+                        let nulls = list.nulls().cloned();
+                        Self::LargeList(Box::new(child), ctx, offsets, nulls)
+                    }
+                    DataType::Map(_, _) => {
+                        let map = array.as_map();
+                        let entries = Arc::new(map.entries().clone()) as 
ArrayRef;
+                        let child = Self::try_new(child.as_ref(), ctx, 
&entries)?;
+                        let offsets = map.offsets().clone();
+                        Self::List(Box::new(child), ctx, offsets, 
map.nulls().cloned())
+                    }
+                    DataType::FixedSizeList(_, size) => {
+                        let list = array.as_fixed_size_list();
+                        let child = Self::try_new(child.as_ref(), ctx, 
list.values())?;
+                        let nulls = list.nulls().cloned();
+                        Self::FixedSizeList(Box::new(child), ctx, *size as _, 
nulls)
+                    }
+                    _ => unreachable!(),
+                })
             }
             d => Err(nyi_err!("Datatype {} is not yet supported", d)),
         }
     }
 
-    /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the 
leaf columns
+    /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the 
leaf columns
     /// as enumerated by a depth-first search
-    fn finish(self) -> Vec<LevelInfo> {
+    fn finish(self) -> Vec<ArrayLevels> {
         match self {
             LevelInfoBuilder::Primitive(v) => vec![v],
-            LevelInfoBuilder::List(v, _) => v.finish(),
-            LevelInfoBuilder::Struct(v, _) => {
+            LevelInfoBuilder::List(v, _, _, _)
+            | LevelInfoBuilder::LargeList(v, _, _, _)
+            | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
+            LevelInfoBuilder::Struct(v, _, _) => {
                 v.into_iter().flat_map(|l| l.finish()).collect()
             }
         }
     }
 
     /// Given an `array`, write the level data for the elements in `range`
-    fn write(&mut self, array: &dyn Array, range: Range<usize>) {
-        match array.data_type() {
-            d if is_leaf(d) => self.write_leaf(array, range),
-            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
-                self.write_leaf(array, range)
-            }
-            DataType::Struct(_) => {
-                let array = array.as_struct();
-                self.write_struct(array, range)
-            }
-            DataType::List(_) => {
-                let array = array.as_list::<i32>();
-                self.write_list(
-                    array.value_offsets(),
-                    array.nulls(),
-                    array.values(),
-                    range,
-                )
+    fn write(&mut self, range: Range<usize>) {
+        match self {
+            LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
+            LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
+                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
             }
-            DataType::LargeList(_) => {
-                let array = array.as_list::<i64>();
-                self.write_list(
-                    array.value_offsets(),
-                    array.nulls(),
-                    array.values(),
-                    range,
-                )
+            LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
+                Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
             }
-            DataType::Map(_, _) => {
-                let array = array.as_map();
-                // A Map is just as ListArray<i32> with a StructArray child, 
we therefore
-                // treat it as such to avoid code duplication
-                self.write_list(
-                    array.value_offsets(),
-                    array.nulls(),
-                    array.entries(),
-                    range,
-                )
+            LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
+                Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), 
range)
             }
-            &DataType::FixedSizeList(_, size) => {
-                let array = array
-                    .as_any()
-                    .downcast_ref::<FixedSizeListArray>()
-                    .expect("unable to get fixed-size list array");
-
-                self.write_fixed_size_list(
-                    size as usize,
-                    array.nulls(),
-                    array.values(),
-                    range,
-                )
+            LevelInfoBuilder::Struct(children, ctx, nulls) => {
+                Self::write_struct(children, ctx, nulls.as_ref(), range)
             }
-            _ => unreachable!(),
         }
     }
 
@@ -237,22 +256,17 @@ impl LevelInfoBuilder {
     ///
     /// Note: MapArrays are `ListArray<i32>` under the hood and so are 
dispatched to this method
     fn write_list<O: OffsetSizeTrait>(
-        &mut self,
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
         offsets: &[O],
         nulls: Option<&NullBuffer>,
-        values: &dyn Array,
         range: Range<usize>,
     ) {
-        let (child, ctx) = match self {
-            Self::List(child, ctx) => (child, ctx),
-            _ => unreachable!(),
-        };
-
         let offsets = &offsets[range.start..range.end + 1];
 
         let write_non_null_slice =
             |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
-                child.write(values, start_idx..end_idx);
+                child.write(start_idx..end_idx);
                 child.visit_leaves(|leaf| {
                     let rep_levels = leaf.rep_levels.as_mut().unwrap();
                     let mut rev = rep_levels.iter_mut().rev();
@@ -324,12 +338,12 @@ impl LevelInfoBuilder {
     }
 
     /// Write `range` elements from StructArray `array`
-    fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
-        let (children, ctx) = match self {
-            Self::Struct(children, ctx) => (children, ctx),
-            _ => unreachable!(),
-        };
-
+    fn write_struct(
+        children: &mut [LevelInfoBuilder],
+        ctx: &LevelContext,
+        nulls: Option<&NullBuffer>,
+        range: Range<usize>,
+    ) {
         let write_null = |children: &mut [LevelInfoBuilder], range: 
Range<usize>| {
             for child in children {
                 child.visit_leaves(|info| {
@@ -346,12 +360,12 @@ impl LevelInfoBuilder {
         };
 
         let write_non_null = |children: &mut [LevelInfoBuilder], range: 
Range<usize>| {
-            for (child_array, child) in array.columns().iter().zip(children) {
-                child.write(child_array, range.clone())
+            for child in children {
+                child.write(range.clone())
             }
         };
 
-        match array.nulls() {
+        match nulls {
             Some(validity) => {
                 let mut last_non_null_idx = None;
                 let mut last_null_idx = None;
@@ -388,22 +402,17 @@ impl LevelInfoBuilder {
 
     /// Write `range` elements from FixedSizeListArray with child data 
`values` and null bitmap `nulls`.
     fn write_fixed_size_list(
-        &mut self,
+        child: &mut LevelInfoBuilder,
+        ctx: &LevelContext,
         fixed_size: usize,
         nulls: Option<&NullBuffer>,
-        values: &dyn Array,
         range: Range<usize>,
     ) {
-        let (child, ctx) = match self {
-            Self::List(child, ctx) => (child, ctx),
-            _ => unreachable!(),
-        };
-
         let write_non_null =
             |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
                 let values_start = start_idx * fixed_size;
                 let values_end = end_idx * fixed_size;
-                child.write(values, values_start..values_end);
+                child.write(values_start..values_end);
 
                 child.visit_leaves(|leaf| {
                     let rep_levels = leaf.rep_levels.as_mut().unwrap();
@@ -481,12 +490,7 @@ impl LevelInfoBuilder {
     }
 
     /// Write a primitive array, as defined by [`is_leaf`]
-    fn write_leaf(&mut self, array: &dyn Array, range: Range<usize>) {
-        let info = match self {
-            Self::Primitive(info) => info,
-            _ => unreachable!(),
-        };
-
+    fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
         let len = range.end - range.start;
 
         match &mut info.def_levels {
@@ -494,7 +498,7 @@ impl LevelInfoBuilder {
                 def_levels.reserve(len);
                 info.non_null_indices.reserve(len);
 
-                match array.logical_nulls() {
+                match info.array.logical_nulls() {
                     Some(nulls) => {
                         // TODO: Faster bitmask iteration (#1757)
                         for i in range {
@@ -523,11 +527,13 @@ impl LevelInfoBuilder {
     }
 
     /// Visits all children of this node in depth first order
-    fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) {
+    fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
         match self {
             LevelInfoBuilder::Primitive(info) => visit(info),
-            LevelInfoBuilder::List(c, _) => c.visit_leaves(visit),
-            LevelInfoBuilder::Struct(children, _) => {
+            LevelInfoBuilder::List(c, _, _, _)
+            | LevelInfoBuilder::LargeList(c, _, _, _)
+            | LevelInfoBuilder::FixedSizeList(c, _, _, _) => 
c.visit_leaves(visit),
+            LevelInfoBuilder::Struct(children, _, _) => {
                 for c in children {
                     c.visit_leaves(visit)
                 }
@@ -537,8 +543,8 @@ impl LevelInfoBuilder {
 }
 /// The data necessary to write a primitive Arrow array to parquet, taking 
into account
 /// any non-primitive parents it may have in the arrow representation
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub(crate) struct LevelInfo {
+#[derive(Debug, Clone)]
+pub(crate) struct ArrayLevels {
     /// Array's definition levels
     ///
     /// Present if `max_def_level != 0`
@@ -558,10 +564,25 @@ pub(crate) struct LevelInfo {
 
     /// The maximum repetition for this leaf column
     max_rep_level: i16,
+
+    /// The arrow array
+    array: ArrayRef,
 }
 
-impl LevelInfo {
-    fn new(ctx: LevelContext, is_nullable: bool) -> Self {
+impl PartialEq for ArrayLevels {
+    fn eq(&self, other: &Self) -> bool {
+        self.def_levels == other.def_levels
+            && self.rep_levels == other.rep_levels
+            && self.non_null_indices == other.non_null_indices
+            && self.max_def_level == other.max_def_level
+            && self.max_rep_level == other.max_rep_level
+            && self.array.as_ref() == other.array.as_ref()
+    }
+}
+impl Eq for ArrayLevels {}
+
+impl ArrayLevels {
+    fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
         let max_rep_level = ctx.rep_level;
         let max_def_level = match is_nullable {
             true => ctx.def_level + 1,
@@ -574,9 +595,14 @@ impl LevelInfo {
             non_null_indices: vec![],
             max_def_level,
             max_rep_level,
+            array,
         }
     }
 
+    pub fn array(&self) -> &ArrayRef {
+        &self.array
+    }
+
     pub fn def_levels(&self) -> Option<&[i16]> {
         self.def_levels.as_deref()
     }
@@ -597,6 +623,7 @@ mod tests {
     use std::sync::Arc;
 
     use arrow_array::builder::*;
+    use arrow_array::cast::AsArray;
     use arrow_array::types::Int32Type;
     use arrow_array::*;
     use arrow_buffer::{Buffer, ToByteSlice};
@@ -622,7 +649,7 @@ mod tests {
         let inner_list = ArrayDataBuilder::new(inner_type)
             .len(4)
             .add_buffer(offsets)
-            .add_child_data(primitives.into_data())
+            .add_child_data(primitives.to_data())
             .build()
             .unwrap();
 
@@ -638,12 +665,13 @@ mod tests {
         let levels = calculate_array_levels(&outer_list, 
&outer_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected = LevelInfo {
+        let expected = ArrayLevels {
             def_levels: Some(vec![2; 10]),
             rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
             non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
             max_def_level: 2,
             max_rep_level: 2,
+            array: Arc::new(primitives),
         };
         assert_eq!(&levels[0], &expected);
     }
@@ -657,12 +685,13 @@ mod tests {
         let levels = calculate_array_levels(&array, &field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: None,
             rep_levels: None,
             non_null_indices: (0..10).collect(),
             max_def_level: 0,
             max_rep_level: 0,
+            array,
         };
         assert_eq!(&levels[0], &expected_levels);
     }
@@ -682,12 +711,13 @@ mod tests {
         let levels = calculate_array_levels(&array, &field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![1, 0, 1, 1, 0]),
             rep_levels: None,
             non_null_indices: vec![0, 2, 3],
             max_def_level: 1,
             max_rep_level: 0,
+            array,
         };
         assert_eq!(&levels[0], &expected_levels);
     }
@@ -706,7 +736,7 @@ mod tests {
         let list = ArrayDataBuilder::new(list_type.clone())
             .len(5)
             .add_buffer(offsets)
-            .add_child_data(leaf_array.into_data())
+            .add_child_data(leaf_array.to_data())
             .build()
             .unwrap();
         let list = make_array(list);
@@ -715,12 +745,13 @@ mod tests {
         let levels = calculate_array_levels(&list, &list_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![1; 5]),
             rep_levels: Some(vec![0; 5]),
             non_null_indices: (0..5).collect(),
             max_def_level: 1,
             max_rep_level: 1,
+            array: Arc::new(leaf_array),
         };
         assert_eq!(&levels[0], &expected_levels);
 
@@ -737,7 +768,7 @@ mod tests {
         let list = ArrayDataBuilder::new(list_type.clone())
             .len(5)
             .add_buffer(offsets)
-            .add_child_data(leaf_array.into_data())
+            .add_child_data(leaf_array.to_data())
             .null_bit_buffer(Some(Buffer::from([0b00011101])))
             .build()
             .unwrap();
@@ -747,12 +778,13 @@ mod tests {
         let levels = calculate_array_levels(&list, &list_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
             rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
             non_null_indices: (0..11).collect(),
             max_def_level: 2,
             max_rep_level: 1,
+            array: Arc::new(leaf_array),
         };
         assert_eq!(&levels[0], &expected_levels);
     }
@@ -778,7 +810,7 @@ mod tests {
         let list_type = DataType::List(Arc::new(leaf_field));
         let list = ArrayData::builder(list_type.clone())
             .len(5)
-            .add_child_data(leaf.into_data())
+            .add_child_data(leaf.to_data())
             .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
             .build()
             .unwrap();
@@ -795,12 +827,13 @@ mod tests {
         let levels = calculate_array_levels(&array, &struct_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
             rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
             non_null_indices: (4..11).collect(),
             max_def_level: 3,
             max_rep_level: 1,
+            array: Arc::new(leaf),
         };
 
         assert_eq!(&levels[0], &expected_levels);
@@ -820,7 +853,7 @@ mod tests {
         let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 
18, 20, 22]);
         let l1 = ArrayData::builder(l1_type.clone())
             .len(11)
-            .add_child_data(leaf.into_data())
+            .add_child_data(leaf.to_data())
             .add_buffer(offsets)
             .build()
             .unwrap();
@@ -840,7 +873,7 @@ mod tests {
         let levels = calculate_array_levels(&l2, &l2_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![
                 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 
5, 5,
             ]),
@@ -850,6 +883,7 @@ mod tests {
             non_null_indices: (0..22).collect(),
             max_def_level: 5,
             max_rep_level: 2,
+            array: Arc::new(leaf),
         };
 
         assert_eq!(&levels[0], &expected_levels);
@@ -871,7 +905,7 @@ mod tests {
         let list = ArrayData::builder(list_type.clone())
             .len(4)
             .add_buffer(Buffer::from_iter(0_i32..5))
-            .add_child_data(leaf.into_data())
+            .add_child_data(leaf.to_data())
             .build()
             .unwrap();
         let list = make_array(list);
@@ -880,12 +914,13 @@ mod tests {
         let levels = calculate_array_levels(&list, &list_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![1; 4]),
             rep_levels: Some(vec![0; 4]),
             non_null_indices: (0..4).collect(),
             max_def_level: 1,
             max_rep_level: 1,
+            array: Arc::new(leaf),
         };
         assert_eq!(&levels[0], &expected_levels);
 
@@ -898,7 +933,7 @@ mod tests {
             .len(4)
             .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
             .null_bit_buffer(Some(Buffer::from([0b00001110])))
-            .add_child_data(leaf.into_data())
+            .add_child_data(leaf.to_data())
             .build()
             .unwrap();
         let list = make_array(list);
@@ -911,12 +946,13 @@ mod tests {
         let levels = calculate_array_levels(&array, &struct_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
             rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
             non_null_indices: (0..7).collect(),
             max_def_level: 3,
             max_rep_level: 1,
+            array: Arc::new(leaf),
         };
         assert_eq!(&levels[0], &expected_levels);
 
@@ -933,7 +969,7 @@ mod tests {
         let list_1 = ArrayData::builder(list_1_type.clone())
             .len(7)
             .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
-            .add_child_data(leaf.into_data())
+            .add_child_data(leaf.to_data())
             .build()
             .unwrap();
 
@@ -958,12 +994,13 @@ mod tests {
         let levels = calculate_array_levels(&array, &struct_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 
5, 5, 5]),
             rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 
2, 2, 2]),
             non_null_indices: (0..15).collect(),
             max_def_level: 5,
             max_rep_level: 2,
+            array: Arc::new(leaf),
         };
         assert_eq!(&levels[0], &expected_levels);
     }
@@ -980,9 +1017,10 @@ mod tests {
         //  - {a: {b: {c: 6}}}
 
         let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), 
Some(6)]);
+        let leaf = Arc::new(c) as ArrayRef;
         let c_field = Arc::new(Field::new("c", DataType::Int32, true));
         let b = StructArray::from((
-            (vec![(c_field, Arc::new(c) as ArrayRef)]),
+            (vec![(c_field, leaf.clone())]),
             Buffer::from([0b00110111]),
         ));
 
@@ -998,12 +1036,13 @@ mod tests {
         let levels = calculate_array_levels(&a_array, &a_field).unwrap();
         assert_eq!(levels.len(), 1);
 
-        let expected_levels = LevelInfo {
+        let expected_levels = ArrayLevels {
             def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
             rep_levels: None,
             non_null_indices: vec![0, 2, 5],
             max_def_level: 3,
             max_rep_level: 0,
+            array: leaf,
         };
         assert_eq!(&levels[0], &expected_levels);
     }
@@ -1020,7 +1059,7 @@ mod tests {
             .len(5)
             .add_buffer(a_value_offsets)
             .null_bit_buffer(Some(Buffer::from(vec![0b00011011])))
-            .add_child_data(a_values.into_data())
+            .add_child_data(a_values.to_data())
             .build()
             .unwrap();
 
@@ -1029,21 +1068,21 @@ mod tests {
         let a = ListArray::from(a_list_data);
 
         let item_field = Field::new("item", a_list_type, true);
-        let mut builder =
-            LevelInfoBuilder::try_new(&item_field, 
Default::default()).unwrap();
-        builder.write(&a, 2..4);
+        let mut builder = levels(&item_field, a);
+        builder.write(2..4);
         let levels = builder.finish();
 
         assert_eq!(levels.len(), 1);
 
         let list_level = levels.get(0).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![0, 3, 3, 3]),
             rep_levels: Some(vec![0, 0, 1, 1]),
             non_null_indices: vec![3, 4, 5],
             max_def_level: 3,
             max_rep_level: 1,
+            array: Arc::new(a_values),
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1100,19 +1139,19 @@ mod tests {
         let g = ListArray::from(g_list_data);
 
         let e = StructArray::from(vec![
-            (struct_field_f, Arc::new(f) as ArrayRef),
+            (struct_field_f, Arc::new(f.clone()) as ArrayRef),
             (struct_field_g, Arc::new(g) as ArrayRef),
         ]);
 
         let c = StructArray::from(vec![
-            (struct_field_d, Arc::new(d) as ArrayRef),
+            (struct_field_d, Arc::new(d.clone()) as ArrayRef),
             (struct_field_e, Arc::new(e) as ArrayRef),
         ]);
 
         // build a record batch
         let batch = RecordBatch::try_new(
             Arc::new(schema),
-            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
+            vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
         )
         .unwrap();
 
@@ -1132,48 +1171,52 @@ mod tests {
         // test "a" levels
         let list_level = levels.get(0).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: None,
             rep_levels: None,
             non_null_indices: vec![0, 1, 2, 3, 4],
             max_def_level: 0,
             max_rep_level: 0,
+            array: Arc::new(a),
         };
         assert_eq!(list_level, &expected_level);
 
         // test "b" levels
         let list_level = levels.get(1).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![1, 0, 0, 1, 1]),
             rep_levels: None,
             non_null_indices: vec![0, 3, 4],
             max_def_level: 1,
             max_rep_level: 0,
+            array: Arc::new(b),
         };
         assert_eq!(list_level, &expected_level);
 
         // test "d" levels
         let list_level = levels.get(2).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![1, 1, 1, 2, 1]),
             rep_levels: None,
             non_null_indices: vec![3],
             max_def_level: 2,
             max_rep_level: 0,
+            array: Arc::new(d),
         };
         assert_eq!(list_level, &expected_level);
 
         // test "f" levels
         let list_level = levels.get(3).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![3, 2, 3, 2, 3]),
             rep_levels: None,
             non_null_indices: vec![0, 2, 4],
             max_def_level: 3,
             max_rep_level: 0,
+            array: Arc::new(f),
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1270,27 +1313,31 @@ mod tests {
             });
         assert_eq!(levels.len(), 2);
 
+        let map = batch.column(0).as_map();
+
         // test key levels
         let list_level = levels.get(0).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![1; 7]),
             rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
             non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
             max_def_level: 1,
             max_rep_level: 1,
+            array: map.keys().clone(),
         };
         assert_eq!(list_level, &expected_level);
 
         // test values levels
         let list_level = levels.get(1).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
             rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
             non_null_indices: vec![0, 1, 2, 4, 6],
             max_def_level: 2,
             max_rep_level: 1,
+            array: map.values().clone(),
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1358,7 +1405,8 @@ mod tests {
 
         let array = Arc::new(list_builder.finish());
 
-        let values_len = array.values().len();
+        let values = array.values().as_struct().column(0).clone();
+        let values_len = values.len();
         assert_eq!(values_len, 5);
 
         let schema = Arc::new(Schema::new(vec![list_field]));
@@ -1368,12 +1416,13 @@ mod tests {
         let levels = calculate_array_levels(rb.column(0), 
rb.schema().field(0)).unwrap();
         let list_level = &levels[0];
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
             rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
             non_null_indices: vec![0, 4],
             max_def_level: 4,
             max_rep_level: 1,
+            array: values,
         };
 
         assert_eq!(list_level, &expected_level);
@@ -1391,6 +1440,7 @@ mod tests {
             None, // Masked by struct array
             None,
         ]);
+        let values = inner.values().clone();
 
         // This test assumes that nulls don't take up space
         assert_eq!(inner.values().len(), 7);
@@ -1406,12 +1456,13 @@ mod tests {
 
         assert_eq!(levels.len(), 1);
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
             rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
             non_null_indices: vec![0, 1, 5, 6],
             max_def_level: 4,
             max_rep_level: 1,
+            array: values,
         };
 
         assert_eq!(&levels[0], &expected_level);
@@ -1422,14 +1473,16 @@ mod tests {
         // Test the null mask of a struct array and the null mask of a list 
array
         // masking out non-null elements of their children
 
-        let a1 = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![
+        let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
             Some(vec![None]), // Masked by list array
             Some(vec![]),     // Masked by list array
             Some(vec![Some(3), None]),
             Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct 
array
             None,
             None,
-        ])) as ArrayRef;
+        ]);
+        let a1_values = a1.values().clone();
+        let a1 = Arc::new(a1) as ArrayRef;
 
         let a2 = Arc::new(Int32Array::from_iter(vec![
             Some(1), // Masked by list array
@@ -1439,6 +1492,7 @@ mod tests {
             Some(5),
             None,
         ])) as ArrayRef;
+        let a2_values = a2.clone();
 
         let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), 
true));
         let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), 
true));
@@ -1486,22 +1540,24 @@ mod tests {
 
         assert_eq!(levels.len(), 2);
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
             rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
             non_null_indices: vec![1],
             max_def_level: 6,
             max_rep_level: 2,
+            array: a1_values,
         };
 
         assert_eq!(&levels[0], &expected_level);
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
             rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
             non_null_indices: vec![4],
             max_def_level: 4,
             max_rep_level: 1,
+            array: a2_values,
         };
 
         assert_eq!(&levels[1], &expected_level);
@@ -1522,23 +1578,24 @@ mod tests {
         builder.values().append_slice(&[9, 10]);
         builder.append(false);
         let a = builder.finish();
+        let values = a.values().clone();
 
         let item_field = Field::new("item", a.data_type().clone(), true);
-        let mut builder =
-            LevelInfoBuilder::try_new(&item_field, 
Default::default()).unwrap();
-        builder.write(&a, 1..4);
+        let mut builder = levels(&item_field, a);
+        builder.write(1..4);
         let levels = builder.finish();
 
         assert_eq!(levels.len(), 1);
 
         let list_level = levels.get(0).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![0, 0, 3, 3]),
             rep_levels: Some(vec![0, 0, 0, 1]),
             non_null_indices: vec![6, 7],
             max_def_level: 3,
             max_rep_level: 1,
+            array: values,
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1670,6 +1727,10 @@ mod tests {
         assert_eq!(array.values().len(), 8);
         assert_eq!(array.len(), 4);
 
+        let struct_values = array.values().as_struct();
+        let values_a = struct_values.column(0).clone();
+        let values_b = struct_values.column(1).clone();
+
         let schema = Arc::new(Schema::new(vec![list_field]));
         let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
 
@@ -1678,20 +1739,22 @@ mod tests {
         let b_levels = &levels[1];
 
         // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
-        let expected_a = LevelInfo {
+        let expected_a = ArrayLevels {
             def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
             rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
             non_null_indices: vec![0, 7],
             max_def_level: 4,
             max_rep_level: 1,
+            array: values_a,
         };
         // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
-        let expected_b = LevelInfo {
+        let expected_b = ArrayLevels {
             def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
             rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
             non_null_indices: vec![0, 6, 7],
             max_def_level: 3,
             max_rep_level: 1,
+            array: values_b,
         };
 
         assert_eq!(a_levels, &expected_a);
@@ -1704,24 +1767,25 @@ mod tests {
         builder.append(true);
         builder.append(false);
         builder.append(true);
-        let a = builder.finish();
+        let array = builder.finish();
+        let values = array.values().clone();
 
-        let item_field = Field::new("item", a.data_type().clone(), true);
-        let mut builder =
-            LevelInfoBuilder::try_new(&item_field, 
Default::default()).unwrap();
-        builder.write(&a, 0..3);
+        let item_field = Field::new("item", array.data_type().clone(), true);
+        let mut builder = levels(&item_field, array);
+        builder.write(0..3);
         let levels = builder.finish();
 
         assert_eq!(levels.len(), 1);
 
         let list_level = levels.get(0).unwrap();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![1, 0, 1]),
             rep_levels: Some(vec![0, 0, 0]),
             non_null_indices: vec![],
             max_def_level: 3,
             max_rep_level: 1,
+            array: values,
         };
         assert_eq!(list_level, &expected_level);
     }
@@ -1744,19 +1808,20 @@ mod tests {
         builder.values().append_null();
         builder.append(false);
         let a = builder.finish();
+        let values = a.values().as_list::<i32>().values().clone();
 
         let item_field = Field::new("item", a.data_type().clone(), true);
-        let mut builder =
-            LevelInfoBuilder::try_new(&item_field, 
Default::default()).unwrap();
-        builder.write(&a, 0..4);
+        let mut builder = levels(&item_field, a);
+        builder.write(0..4);
         let levels = builder.finish();
 
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
             rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
             non_null_indices: vec![0, 2, 3, 4, 5],
             max_def_level: 5,
             max_rep_level: 2,
+            array: values,
         };
 
         assert_eq!(levels[0], expected_level);
@@ -1777,17 +1842,22 @@ mod tests {
 
         let item_field = Field::new("item", dict.data_type().clone(), true);
 
-        let mut builder =
-            LevelInfoBuilder::try_new(&item_field, 
Default::default()).unwrap();
-        builder.write(&dict, 0..4);
+        let mut builder = levels(&item_field, dict.clone());
+        builder.write(0..4);
         let levels = builder.finish();
-        let expected_level = LevelInfo {
+        let expected_level = ArrayLevels {
             def_levels: Some(vec![0, 0, 1, 1]),
             rep_levels: None,
             non_null_indices: vec![2, 3],
             max_def_level: 1,
             max_rep_level: 0,
+            array: Arc::new(dict),
         };
         assert_eq!(levels[0], expected_level);
     }
+
+    fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder 
{
+        let v = Arc::new(array) as ArrayRef;
+        LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
+    }
 }
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 2e170738f1..5dae81d471 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -18,7 +18,6 @@
 //! Contains writer which writes arrow data into parquet data.
 
 use bytes::Bytes;
-use std::fmt::Debug;
 use std::io::{Read, Write};
 use std::iter::Peekable;
 use std::slice::Iter;
@@ -28,8 +27,10 @@ use thrift::protocol::{TCompactOutputProtocol, 
TSerializable};
 
 use arrow_array::cast::AsArray;
 use arrow_array::types::*;
-use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
-use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, 
SchemaRef};
+use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
+use arrow_schema::{
+    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef,
+};
 
 use super::schema::{
     add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
@@ -47,14 +48,14 @@ use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ColumnChunkMetaData, KeyValue, 
RowGroupMetaDataPtr};
 use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
 use crate::file::reader::{ChunkReader, Length};
-use crate::file::writer::SerializedFileWriter;
+use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
 use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use levels::{calculate_array_levels, LevelInfo};
+use levels::{calculate_array_levels, ArrayLevels};
 
 mod byte_array;
 mod levels;
 
-/// Arrow writer
+/// Encodes [`RecordBatch`] to parquet
 ///
 /// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] 
will be encoded
 /// to the same row group, up to `max_row_group_size` rows. Any remaining rows 
will be
@@ -97,7 +98,7 @@ pub struct ArrowWriter<W: Write> {
     max_row_group_size: usize,
 }
 
-impl<W: Write + Send> Debug for ArrowWriter<W> {
+impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         let buffered_memory = self.in_progress_size();
         f.debug_struct("ArrowWriter")
@@ -150,7 +151,7 @@ impl<W: Write + Send> ArrowWriter<W> {
             Some(in_progress) => in_progress
                 .writers
                 .iter()
-                .map(|(_, x)| x.get_estimated_total_bytes() as usize)
+                .map(|x| x.get_estimated_total_bytes())
                 .sum(),
             None => 0,
         }
@@ -208,8 +209,8 @@ impl<W: Write + Send> ArrowWriter<W> {
         };
 
         let mut row_group_writer = self.writer.next_row_group()?;
-        for (chunk, close) in in_progress.close()? {
-            row_group_writer.append_column(&chunk, close)?;
+        for chunk in in_progress.close()? {
+            chunk.append_to_row_group(&mut row_group_writer)?;
         }
         row_group_writer.close()?;
         Ok(())
@@ -246,20 +247,20 @@ impl<W: Write + Send> RecordBatchWriter for 
ArrowWriter<W> {
     }
 }
 
-/// A list of [`Bytes`] comprising a single column chunk
+/// A single column chunk produced by [`ArrowColumnWriter`]
 #[derive(Default)]
-pub struct ArrowColumnChunk {
+struct ArrowColumnChunkData {
     length: usize,
     data: Vec<Bytes>,
 }
 
-impl Length for ArrowColumnChunk {
+impl Length for ArrowColumnChunkData {
     fn len(&self) -> u64 {
         self.length as _
     }
 }
 
-impl ChunkReader for ArrowColumnChunk {
+impl ChunkReader for ArrowColumnChunkData {
     type T = ArrowColumnChunkReader;
 
     fn get_read(&self, start: u64) -> Result<Self::T> {
@@ -274,8 +275,8 @@ impl ChunkReader for ArrowColumnChunk {
     }
 }
 
-/// A [`Read`] for an iterator of [`Bytes`]
-pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+/// A [`Read`] for [`ArrowColumnChunkData`]
+struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
 
 impl Read for ArrowColumnChunkReader {
     fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
@@ -297,11 +298,11 @@ impl Read for ArrowColumnChunkReader {
     }
 }
 
-/// A shared [`ArrowColumnChunk`]
+/// A shared [`ArrowColumnChunkData`]
 ///
 /// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access 
via
 /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential 
borrows
-type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
+type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
 
 #[derive(Default)]
 struct ArrowPageWriter {
@@ -347,40 +348,180 @@ impl PageWriter for ArrowPageWriter {
     }
 }
 
-/// Encodes a leaf column to [`ArrowPageWriter`]
-enum ArrowColumnWriter {
+/// A leaf column that can be encoded by [`ArrowColumnWriter`]
+#[derive(Debug)]
+pub struct ArrowLeafColumn(ArrayLevels);
+
+/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
+pub fn compute_leaves(field: &Field, array: &ArrayRef) -> 
Result<Vec<ArrowLeafColumn>> {
+    let levels = calculate_array_levels(array, field)?;
+    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
+}
+
+/// The data for a single column chunk, see [`ArrowColumnWriter`]
+pub struct ArrowColumnChunk {
+    data: ArrowColumnChunkData,
+    close: ColumnCloseResult,
+}
+
+impl std::fmt::Debug for ArrowColumnChunk {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ArrowColumnChunk")
+            .field("length", &self.data.length)
+            .finish_non_exhaustive()
+    }
+}
+
+impl ArrowColumnChunk {
+    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's 
data
+    pub fn append_to_row_group<W: Write + Send>(
+        self,
+        writer: &mut SerializedRowGroupWriter<'_, W>,
+    ) -> Result<()> {
+        writer.append_column(&self.data, self.close)
+    }
+}
+
+/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
+///
+/// Note: This is a low-level interface for applications that require 
fine-grained control
+/// of encoding, see [`ArrowWriter`] for a higher-level interface
+///
+/// ```
+/// // The arrow schema
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_schema::*;
+/// # use parquet::arrow::arrow_to_parquet_schema;
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers};
+/// # use parquet::file::properties::WriterProperties;
+/// # use parquet::file::writer::SerializedFileWriter;
+/// #
+/// let schema = Arc::new(Schema::new(vec![
+///     Field::new("i32", DataType::Int32, false),
+///     Field::new("f32", DataType::Float32, false),
+/// ]));
+///
+/// // Compute the parquet schema
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
+/// let props = Arc::new(WriterProperties::default());
+///
+/// // Create writers for each of the leaf columns
+/// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
+///
+/// // Spawn a worker thread for each column
+/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, 
would be better
+/// let mut workers: Vec<_> = col_writers
+///     .into_iter()
+///     .map(|mut col_writer| {
+///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
+///         let handle = std::thread::spawn(move || {
+///             for col in recv {
+///                 col_writer.write(&col)?;
+///             }
+///             col_writer.close()
+///         });
+///         (handle, send)
+///     })
+///     .collect();
+///
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// let mut out = Vec::with_capacity(1024); // This could be a File
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone()).unwrap();
+///
+/// // Start row group
+/// let mut row_group = writer.next_row_group().unwrap();
+///
+/// // Columns to encode
+/// let to_write = vec![
+///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
+///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
+/// ];
+///
+/// // Spawn work to encode columns
+/// let mut worker_iter = workers.iter_mut();
+/// for (arr, field) in to_write.iter().zip(&schema.fields) {
+///     for leaves in compute_leaves(field, arr).unwrap() {
+///         worker_iter.next().unwrap().1.send(leaves).unwrap();
+///     }
+/// }
+///
+/// // Finish up parallel column encoding
+/// for (handle, send) in workers {
+///     drop(send); // Drop send side to signal termination
+///     let chunk = handle.join().unwrap().unwrap();
+///     chunk.append_to_row_group(&mut row_group).unwrap();
+/// }
+/// row_group.close().unwrap();
+///
+/// let metadata = writer.close().unwrap();
+/// assert_eq!(metadata.num_rows, 3);
+/// ```
+pub struct ArrowColumnWriter {
+    writer: ArrowColumnWriterImpl,
+    chunk: SharedColumnChunk,
+}
+
+impl std::fmt::Debug for ArrowColumnWriter {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
+    }
+}
+
+enum ArrowColumnWriterImpl {
     ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
     Column(ColumnWriter<'static>),
 }
 
 impl ArrowColumnWriter {
+    /// Write an [`ArrowLeafColumn`]
+    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
+        match &mut self.writer {
+            ArrowColumnWriterImpl::Column(c) => {
+                write_leaf(c, &col.0)?;
+            }
+            ArrowColumnWriterImpl::ByteArray(c) => {
+                write_primitive(c, col.0.array().as_ref(), &col.0)?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Close this column returning the written [`ArrowColumnChunk`]
+    pub fn close(self) -> Result<ArrowColumnChunk> {
+        let close = match self.writer {
+            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
+            ArrowColumnWriterImpl::Column(c) => c.close()?,
+        };
+        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
+        let data = chunk.into_inner().unwrap();
+        Ok(ArrowColumnChunk { data, close })
+    }
+
     /// Returns the estimated total bytes for this column writer
-    fn get_estimated_total_bytes(&self) -> u64 {
-        match self {
-            ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(),
-            ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(),
+    pub fn get_estimated_total_bytes(&self) -> usize {
+        match &self.writer {
+            ArrowColumnWriterImpl::ByteArray(c) => 
c.get_estimated_total_bytes() as _,
+            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() 
as _,
         }
     }
 }
 
 /// Encodes [`RecordBatch`] to a parquet row group
-pub struct ArrowRowGroupWriter {
-    writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+struct ArrowRowGroupWriter {
+    writers: Vec<ArrowColumnWriter>,
     schema: SchemaRef,
     buffered_rows: usize,
 }
 
 impl ArrowRowGroupWriter {
-    pub fn new(
+    fn new(
         parquet: &SchemaDescriptor,
         props: &WriterPropertiesPtr,
         arrow: &SchemaRef,
     ) -> Result<Self> {
-        let mut writers = Vec::with_capacity(arrow.fields.len());
-        let mut leaves = parquet.columns().iter();
-        for field in &arrow.fields {
-            get_arrow_column_writer(field.data_type(), props, &mut leaves, 
&mut writers)?;
-        }
+        let writers = get_column_writers(parquet, props, arrow)?;
         Ok(Self {
             writers,
             schema: arrow.clone(),
@@ -388,51 +529,64 @@ impl ArrowRowGroupWriter {
         })
     }
 
-    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         self.buffered_rows += batch.num_rows();
-        let mut writers = self.writers.iter_mut().map(|(_, x)| x);
-        for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
-            let mut levels = calculate_array_levels(array, field)?.into_iter();
-            write_leaves(&mut writers, &mut levels, array.as_ref())?;
+        let mut writers = self.writers.iter_mut();
+        for (field, column) in 
self.schema.fields().iter().zip(batch.columns()) {
+            for leaf in compute_leaves(field.as_ref(), column)? {
+                writers.next().unwrap().write(&leaf)?
+            }
         }
         Ok(())
     }
 
-    pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
         self.writers
             .into_iter()
-            .map(|(chunk, writer)| {
-                let close_result = match writer {
-                    ArrowColumnWriter::ByteArray(c) => c.close()?,
-                    ArrowColumnWriter::Column(c) => c.close()?,
-                };
-
-                let chunk = 
Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap();
-                Ok((chunk, close_result))
-            })
+            .map(|writer| writer.close())
             .collect()
     }
 }
 
-/// Get an [`ArrowColumnWriter`] along with a reference to its 
[`SharedColumnChunk`]
+/// Returns the [`ArrowColumnWriter`] for a given schema
+pub fn get_column_writers(
+    parquet: &SchemaDescriptor,
+    props: &WriterPropertiesPtr,
+    arrow: &SchemaRef,
+) -> Result<Vec<ArrowColumnWriter>> {
+    let mut writers = Vec::with_capacity(arrow.fields.len());
+    let mut leaves = parquet.columns().iter();
+    for field in &arrow.fields {
+        get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut 
writers)?;
+    }
+    Ok(writers)
+}
+
+/// Gets the [`ArrowColumnWriter`] for the given `data_type`
 fn get_arrow_column_writer(
     data_type: &ArrowDataType,
     props: &WriterPropertiesPtr,
     leaves: &mut Iter<'_, ColumnDescPtr>,
-    out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+    out: &mut Vec<ArrowColumnWriter>,
 ) -> Result<()> {
     let col = |desc: &ColumnDescPtr| {
         let page_writer = Box::<ArrowPageWriter>::default();
         let chunk = page_writer.buffer.clone();
         let writer = get_column_writer(desc.clone(), props.clone(), 
page_writer);
-        (chunk, ArrowColumnWriter::Column(writer))
+        ArrowColumnWriter {
+            chunk,
+            writer: ArrowColumnWriterImpl::Column(writer),
+        }
     };
 
     let bytes = |desc: &ColumnDescPtr| {
         let page_writer = Box::<ArrowPageWriter>::default();
         let chunk = page_writer.buffer.clone();
         let writer = GenericColumnWriter::new(desc.clone(), props.clone(), 
page_writer);
-        (chunk, ArrowColumnWriter::ByteArray(writer))
+        ArrowColumnWriter {
+            chunk,
+            writer: ArrowColumnWriterImpl::ByteArray(writer),
+        }
     };
 
     match data_type {
@@ -478,52 +632,8 @@ fn get_arrow_column_writer(
     Ok(())
 }
 
-/// Write the leaves of `array` in depth-first order to `writers` with `levels`
-fn write_leaves<'a, W>(
-    writers: &mut W,
-    levels: &mut IntoIter<LevelInfo>,
-    array: &(dyn Array + 'static),
-) -> Result<()>
-where
-    W: Iterator<Item = &'a mut ArrowColumnWriter>,
-{
-    match array.data_type() {
-        ArrowDataType::List(_) => {
-            write_leaves(writers, levels, 
array.as_list::<i32>().values().as_ref())?
-        }
-        ArrowDataType::LargeList(_) => {
-            write_leaves(writers, levels, 
array.as_list::<i64>().values().as_ref())?
-        }
-        ArrowDataType::FixedSizeList(_, _) => {
-            let array = 
array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
-            write_leaves(writers, levels, array.values().as_ref())?
-        }
-        ArrowDataType::Struct(_) => {
-            for column in array.as_struct().columns() {
-                write_leaves(writers, levels, column.as_ref())?
-            }
-        }
-        ArrowDataType::Map(_, _) => {
-            let map = array.as_map();
-            write_leaves(writers, levels, map.keys().as_ref())?;
-            write_leaves(writers, levels, map.values().as_ref())?
-        }
-        _ => {
-            let levels = levels.next().unwrap();
-            match writers.next().unwrap() {
-                ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?,
-                ArrowColumnWriter::ByteArray(c) => write_primitive(c, array, 
levels)?,
-            };
-        }
-    }
-    Ok(())
-}
-
-fn write_leaf(
-    writer: &mut ColumnWriter<'_>,
-    column: &dyn Array,
-    levels: LevelInfo,
-) -> Result<usize> {
+fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> 
Result<usize> {
+    let column = levels.array().as_ref();
     let indices = levels.non_null_indices();
     match writer {
         ColumnWriter::Int32ColumnWriter(ref mut typed) => {
@@ -678,7 +788,7 @@ fn write_leaf(
 fn write_primitive<E: ColumnValueEncoder>(
     writer: &mut GenericColumnWriter<E>,
     values: &E::Values,
-    levels: LevelInfo,
+    levels: &ArrayLevels,
 ) -> Result<usize> {
     writer.write_batch_internal(
         values,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 859a0aa1f9..cafb176135 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -115,8 +115,7 @@ pub type OnCloseRowGroup<'a> = Box<
             Vec<Option<ColumnIndex>>,
             Vec<Option<OffsetIndex>>,
         ) -> Result<()>
-        + 'a
-        + Send,
+        + 'a,
 >;
 
 // ----------------------------------------------------------------------

Reply via email to