tustvold commented on code in PR #1746:
URL: https://github.com/apache/arrow-rs/pull/1746#discussion_r882974733


##########
parquet/src/arrow/levels.rs:
##########
@@ -171,1149 +89,412 @@ impl LevelInfo {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal(_, _)
-            | DataType::FixedSizeBinary(_) => {
-                // we return a vector of 1 value to represent the primitive
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            | DataType::FixedSizeBinary(_)
+    )
+}
+
+/// The definition and repetition level of an array within a potentially 
nested hierarchy
+#[derive(Debug, Default, Clone, Copy)]
+struct LevelContext {
+    /// The current repetition level
+    rep_level: i16,
+    /// The current definition level
+    def_level: i16,
+}
+
+/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+enum LevelInfoBuilder {
+    Primitive(LevelInfo),
+    List(Box<LevelInfoBuilder>, LevelContext),
+    Struct(Vec<LevelInfoBuilder>, LevelContext),
+}
+
+impl LevelInfoBuilder {
+    /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent 
[`LevelContext`]
+    fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
+        match field.data_type() {
+            d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
+                parent_ctx,
+                field.is_nullable(),
+            ))),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => 
Ok(Self::Primitive(
+                LevelInfo::new(parent_ctx, field.is_nullable()),
+            )),
+            DataType::Struct(children) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 1,
+                    false => parent_ctx.def_level,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level,
+                    def_level,
+                };
+
+                let children = children
+                    .iter()
+                    .map(|f| Self::try_new(f, ctx))
+                    .collect::<Result<_>>()?;
+
+                Ok(Self::Struct(children, ctx))
             }
-            DataType::List(list_field) | DataType::LargeList(list_field) => {
-                let child_offset = array_offsets[0] as usize;
-                let child_len = *array_offsets.last().unwrap() as usize;
-                // Calculate the list level
-                let list_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::List(field.is_nullable()),
-                );
-
-                // Construct the child array of the list, and get its offset + 
mask
-                let array_data = array.data();
-                let child_data = array_data.child_data().get(0).unwrap();
-                let child_array = make_array(child_data.clone());
-                let (child_offsets, child_mask) = 
Self::get_array_offsets_and_masks(
-                    &child_array,
-                    child_offset,
-                    child_len - child_offset,
-                );
-
-                match child_array.data_type() {
-                    DataType::Null
-                    | DataType::Boolean
-                    | DataType::Int8
-                    | DataType::Int16
-                    | DataType::Int32
-                    | DataType::Int64
-                    | DataType::UInt8
-                    | DataType::UInt16
-                    | DataType::UInt32
-                    | DataType::UInt64
-                    | DataType::Float16
-                    | DataType::Float32
-                    | DataType::Float64
-                    | DataType::Timestamp(_, _)
-                    | DataType::Date32
-                    | DataType::Date64
-                    | DataType::Time32(_)
-                    | DataType::Time64(_)
-                    | DataType::Duration(_)
-                    | DataType::Interval(_)
-                    | DataType::Binary
-                    | DataType::LargeBinary
-                    | DataType::Utf8
-                    | DataType::LargeUtf8
-                    | DataType::Dictionary(_, _)
-                    | DataType::Decimal(_, _)
-                    | DataType::FixedSizeBinary(_) => {
-                        vec![list_level.calculate_child_levels(
-                            child_offsets,
-                            child_mask,
-                            LevelType::Primitive(list_field.is_nullable()),
-                        )]
-                    }
-                    DataType::List(_)
-                    | DataType::LargeList(_)
-                    | DataType::Struct(_)
-                    | DataType::Map(_, _) => {
-                        list_level.calculate_array_levels(&child_array, 
list_field)
-                    }
-                    DataType::FixedSizeList(_, _) => unimplemented!(),
-                    DataType::Union(_, _, _) => unimplemented!(),
-                }
+            DataType::List(child)
+            | DataType::LargeList(child)
+            | DataType::Map(child, _) => {
+                let def_level = match field.is_nullable() {
+                    true => parent_ctx.def_level + 2,
+                    false => parent_ctx.def_level + 1,
+                };
+
+                let ctx = LevelContext {
+                    rep_level: parent_ctx.rep_level + 1,
+                    def_level,
+                };
+
+                let child = Self::try_new(child.as_ref(), ctx)?;
+                Ok(Self::List(Box::new(child), ctx))
             }
-            DataType::Map(map_field, _) => {
-                // Calculate the map level
-                let map_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    // A map is treated like a list as it has repetition
-                    LevelType::List(field.is_nullable()),
-                );
-
-                let map_array = 
array.as_any().downcast_ref::<MapArray>().unwrap();
-
-                let key_array = map_array.keys();
-                let value_array = map_array.values();
-
-                if let DataType::Struct(fields) = map_field.data_type() {
-                    let key_field = &fields[0];
-                    let value_field = &fields[1];
-
-                    let mut map_levels = vec![];
-
-                    // Get key levels
-                    let mut key_levels =
-                        map_level.calculate_array_levels(&key_array, 
key_field);
-                    map_levels.append(&mut key_levels);
-
-                    let mut value_levels =
-                        map_level.calculate_array_levels(&value_array, 
value_field);
-                    map_levels.append(&mut value_levels);
-
-                    map_levels
-                } else {
-                    panic!(
-                        "Map field should be a struct, found {:?}",
-                        map_field.data_type()
-                    );
-                }
+            d => Err(nyi_err!("Datatype {} is not yet supported", d)),
+        }
+    }
+
+    /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the 
leaf columns
+    /// as enumerated by a depth-first search
+    fn finish(self) -> Vec<LevelInfo> {
+        match self {
+            LevelInfoBuilder::Primitive(v) => vec![v],
+            LevelInfoBuilder::List(v, _) => v.finish(),
+            LevelInfoBuilder::Struct(v, _) => {
+                v.into_iter().flat_map(|l| l.finish()).collect()
+            }
+        }
+    }
+
+    /// Given an `array`, write the level data for the elements in `range`
+    fn write(&mut self, array: &ArrayRef, range: Range<usize>) {
+        match array.data_type() {
+            d if is_leaf(d) => self.write_leaf(array, range),
+            DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+                self.write_leaf(array, range)
             }
-            DataType::FixedSizeList(_, _) => unimplemented!(),
-            DataType::Struct(struct_fields) => {
-                let struct_array: &StructArray = array
+            DataType::Struct(_) => {
+                let array = 
array.as_any().downcast_ref::<StructArray>().unwrap();
+                self.write_struct(array, range)
+            }
+            DataType::List(_) => {
+                let array = array
                     .as_any()
-                    .downcast_ref::<StructArray>()
-                    .expect("Unable to get struct array");
-                let mut struct_level = self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Struct(field.is_nullable()),
-                );
-
-                // If the parent field is a list, calculate the children of 
the struct as if it
-                // were a list as well.
-                if matches!(self.level_type, LevelType::List(_)) {
-                    struct_level.level_type = LevelType::List(false);
-                }
+                    .downcast_ref::<GenericListArray<i32>>()
+                    .unwrap();
+                self.write_list(array.value_offsets(), array.data(), range)
+            }
+            DataType::LargeList(_) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<GenericListArray<i64>>()
+                    .unwrap();
 
-                let mut struct_levels = vec![];
-                struct_array
-                    .columns()
-                    .into_iter()
-                    .zip(struct_fields)
-                    .for_each(|(child_array, child_field)| {
-                        let mut levels =
-                            struct_level.calculate_array_levels(child_array, 
child_field);
-                        struct_levels.append(&mut levels);
-                    });
-                struct_levels
+                self.write_list(array.value_offsets(), array.data(), range)
             }
-            DataType::Union(_, _, _) => unimplemented!(),
-            DataType::Dictionary(_, _) => {
-                // Need to check for these cases not implemented in C++:
-                // - "Writing DictionaryArray with nested dictionary type not 
yet supported"
-                // - "Writing DictionaryArray with null encoded in dictionary 
type not yet supported"
-                // vec![self.get_primitive_def_levels(array, field, 
array_mask)]
-                vec![self.calculate_child_levels(
-                    array_offsets,
-                    array_mask,
-                    LevelType::Primitive(field.is_nullable()),
-                )]
+            DataType::Map(_, _) => {
+                let array = array.as_any().downcast_ref::<MapArray>().unwrap();
+                // A Map is just as ListArray<i32> with a StructArray child, 
we therefore
+                // treat it as such to avoid code duplication
+                self.write_list(array.value_offsets(), array.data(), range)
             }
+            _ => unreachable!(),
         }
     }
 
-    /// Calculate child/leaf array levels.
-    ///
-    /// The algorithm works by incrementing definitions of array values based 
on whether:
-    /// - a value is optional or required (is_nullable)
-    /// - a list value is repeated + optional or required (is_list)
-    ///
-    /// A record batch always starts at a populated definition = level 0.
-    /// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column 
`a`
-    /// can only have a maximum level of 1 if it is not null.
-    /// If it is not null, we increment by 1, such that the null slots will = 
level 1.
-    /// The above applies to types that have no repetition (anything not a 
list or map).
-    ///
-    /// If a batch has lists, then we increment by up to 2 levels:
-    /// - 1 level for the list (repeated)
-    /// - 1 level if the list itself is nullable (optional)
-    ///
-    /// A list's child then gets incremented using the above rules.
+    /// Write `range` elements from ListArray `array`
     ///
-    /// *Exceptions*
-    ///
-    /// There are 2 exceptions from the above rules:
-    ///
-    /// 1. When at the root of the schema: We always increment the
-    /// level regardless of whether the child is nullable or not. If we do not 
do
-    /// this, we could have a non-nullable array having a definition of 0.
-    ///
-    /// 2. List parent, non-list child: We always increment the level in this 
case,
-    /// regardless of whether the child is nullable or not.
-    ///
-    /// *Examples*
-    ///
-    /// A batch with only a primitive that's non-nullable. 
`<primitive[required]>`:
-    /// * We don't increment the definition level as the array is not optional.
-    /// * This would leave us with a definition of 0, so the first exception 
applies.
-    /// * The definition level becomes 1.
-    ///
-    /// A batch with only a primitive that's nullable. `<primitive[optional]>`:
-    /// * The definition level becomes 1, as we increment it once.
-    ///
-    /// A batch with a single non-nullable list (both list and child not null):
-    /// * We calculate the level twice, for the list, and for the child.
-    /// * At the list, the level becomes 1, where 0 indicates that the list is
-    ///  empty, and 1 says it's not (determined through offsets).
-    /// * At the primitive level, the second exception applies. The level 
becomes 2.
-    fn calculate_child_levels(
-        &self,
-        // we use 64-bit offsets to also accommodate large arrays
-        array_offsets: Vec<i64>,
-        array_mask: Vec<bool>,
-        level_type: LevelType,
-    ) -> Self {
-        let min_len = *(array_offsets.last().unwrap()) as usize;
-        let mut definition = Vec::with_capacity(min_len);
-        let mut repetition = Vec::with_capacity(min_len);
-        let mut merged_array_mask = Vec::with_capacity(min_len);
-
-        let max_definition = match (self.level_type, level_type) {
-            // Handle the illegal cases
-            (_, LevelType::Root) => {
-                unreachable!("Cannot have a root as a child")
-            }
-            (LevelType::Primitive(_), _) => {
-                unreachable!("Cannot have a primitive parent for any type")
-            }
-            // The general case
-            (_, _) => self.max_definition + level_type.level_increment(),
-        };
+    /// Note: MapArrays are ListArray<i32> under the hood and so are 
dispatched to this method
+    fn write_list<O: OffsetSizeTrait>(
+        &mut self,
+        offsets: &[O],
+        list_data: &ArrayData,
+        range: Range<usize>,
+    ) {
+        let (child, ctx) = match self {
+            Self::List(child, ctx) => (child, ctx),
+            _ => unreachable!(),
+        };
+
+        let offsets = &offsets[range.start..range.end + 1];
+        let child_array = make_array(list_data.child_data()[0].clone());
+
+        let write_non_null_slice =
+            |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
+                child.write(&child_array, start_idx..end_idx);
+                child.visit_leaves(|leaf| {
+                    let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                    let mut rev = rep_levels.iter_mut().rev();
+                    let mut remaining = end_idx - start_idx;
+
+                    loop {
+                        let next = rev.next().unwrap();
+                        if *next > ctx.rep_level {
+                            // Nested element - ignore
+                            continue;
+                        }
 
-        match (self.level_type, level_type) {
-            (LevelType::List(_), LevelType::List(is_nullable)) => {
-                // Parent is a list or descendant of a list, and child is a 
list
-                let reps = self.repetition.clone().unwrap();
-
-                // List is null, and not empty
-                let l1 = max_definition - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                let mut nulls_seen = 0;
-
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        // If the parent length is 0, there won't be a slot 
for the child
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(0);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+                        remaining -= 1;
+                        if remaining == 0 {
+                            *next = ctx.rep_level - 1;
+                            break;
+                        }
+                    }
+                })
+            };
+
+        let write_empty_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 1);
+            })
+        };
+
+        let write_null_slice = |child: &mut LevelInfoBuilder| {
+            child.visit_leaves(|leaf| {
+                let rep_levels = leaf.rep_levels.as_mut().unwrap();
+                rep_levels.push(ctx.rep_level - 1);
+                let def_levels = leaf.def_levels.as_mut().unwrap();
+                def_levels.push(ctx.def_level - 2);
+            })
+        };
+
+        match list_data.null_bitmap() {
+            Some(nulls) => {
+                let null_offset = list_data.offset() + range.start;
+                for (idx, w) in offsets.windows(2).enumerate() {
+                    let is_valid = nulls.is_set(idx + null_offset);
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if !is_valid {
+                        write_null_slice(child)
+                    } else if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        (start..end).for_each(|parent_index| {
-                            let index = parent_index + nulls_seen - 
self.offset;
-                            let parent_index = parent_index - self.offset;
-
-                            // parent is either defined at this level, or 
earlier
-                            let parent_def = self.definition[index];
-                            let parent_rep = reps[index];
-                            let parent_mask = self.array_mask[index];
-
-                            // valid parent, index into children
-                            let child_start = array_offsets[parent_index] as 
usize;
-                            let child_end = array_offsets[parent_index + 1] as 
usize;
-                            let child_len = child_end - child_start;
-                            let child_mask = array_mask[parent_index];
-                            let merged_mask = parent_mask && child_mask;
-
-                            if child_len == 0 {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 
});
-                                repetition.push(parent_rep);
-                                merged_array_mask.push(merged_mask);
-                            } else {
-                                
(child_start..child_end).for_each(|child_index| {
-                                    let rep = match (
-                                        parent_index == start,
-                                        child_index == child_start,
-                                    ) {
-                                        (true, true) => parent_rep,
-                                        (true, false) => parent_rep + 2,
-                                        (false, true) => parent_rep,
-                                        (false, false) => parent_rep + 1,
-                                    };
-
-                                    definition.push(if !parent_mask {
-                                        parent_def
-                                    } else if child_mask {
-                                        l3
-                                    } else {
-                                        l1
-                                    });
-                                    repetition.push(rep);
-                                    merged_array_mask.push(merged_mask);
-                                });
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (LevelType::List(_), _) => {
-                // List and primitive (or struct).
-                // The list can have more values than the primitive, 
indicating that there
-                // are slots where the list is empty. We use a counter to 
track this behaviour.
-                let mut nulls_seen = 0;
-
-                // let child_max_definition = list_max_definition + 
is_nullable as i16;
-                // child values are a function of parent list offsets
-                let reps = self.repetition.as_deref().unwrap();
-                self.array_offsets.windows(2).for_each(|w| {
-                    let start = w[0] as usize;
-                    let end = w[1] as usize;
-                    let parent_len = end - start;
-
-                    if parent_len == 0 {
-                        let index = start + nulls_seen - self.offset;
-                        definition.push(self.definition[index]);
-                        repetition.push(reps[index]);
-                        merged_array_mask.push(self.array_mask[index]);
-                        nulls_seen += 1;
+            None => {
+                for w in offsets.windows(2) {
+                    let start_idx = w[0].to_usize().unwrap();
+                    let end_idx = w[1].to_usize().unwrap();
+                    if start_idx == end_idx {
+                        write_empty_slice(child)
                     } else {
-                        // iterate through the array, adjusting child 
definitions for nulls
-                        (start..end).for_each(|child_index| {
-                            let index = child_index + nulls_seen - self.offset;
-                            let child_mask = array_mask[child_index - 
self.offset];
-                            let parent_mask = self.array_mask[index];
-                            let parent_def = self.definition[index];
-
-                            if !parent_mask || parent_def < 
self.max_definition {
-                                definition.push(parent_def);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(parent_mask);
-                            } else {
-                                definition.push(max_definition - !child_mask 
as i16);
-                                repetition.push(reps[index]);
-                                merged_array_mask.push(child_mask);
-                            }
-                        });
+                        write_non_null_slice(child, start_idx, end_idx)
                     }
-                });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets: self.array_offsets.clone(),
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset: offset + self.offset,
-                    length,
                 }
             }
-            (_, LevelType::List(is_nullable)) => {
-                // Encountering a list for the first time.
-                // Calculate the 2 list hierarchy definitions in advance
-
-                // List is null, and not empty
-                let l1 = max_definition - 1 - is_nullable as i16;
-                // List is not null, but is empty
-                let l2 = max_definition - 1;
-                // List is not null, and not empty
-                let l3 = max_definition;
-
-                self.definition
-                    .iter()
-                    .enumerate()
-                    .for_each(|(parent_index, def)| {
-                        let child_from = array_offsets[parent_index];
-                        let child_to = array_offsets[parent_index + 1];
-                        let child_len = child_to - child_from;
-                        let child_mask = array_mask[parent_index];
-                        let parent_mask = self.array_mask[parent_index];
-
-                        match (parent_mask, child_len) {
-                            (true, 0) => {
-                                // Empty slot, i.e. {"parent": {"child": [] } }
-                                // Nullness takes priority over emptiness
-                                definition.push(if child_mask { l2 } else { l1 
});
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (false, 0) => {
-                                // Inherit the parent definition as parent was 
null
-                                definition.push(*def);
-                                repetition.push(0);
-                                merged_array_mask.push(child_mask);
-                            }
-                            (true, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // l1 and l3 make sense as list is not 
empty,
-                                    // but we reflect that it's either null or 
not
-                                    definition.push(if child_mask { l3 } else 
{ l1 });
-                                    // Mark the first child slot as 0, and the 
next as 1
-                                    repetition.push(if child_index == 
child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(child_mask);
-                                });
-                            }
-                            (false, _) => {
-                                (child_from..child_to).for_each(|child_index| {
-                                    // Inherit the parent definition as parent 
was null
-                                    definition.push(*def);
-                                    // mark the first child slot as 0, and the 
next as 1
-                                    repetition.push(if child_index == 
child_from {
-                                        0
-                                    } else {
-                                        1
-                                    });
-                                    merged_array_mask.push(false);
-                                });
-                            }
+        }
+    }
+
+    /// Write `range` elements from StructArray `array`
+    fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
+        let (children, ctx) = match self {
+            Self::Struct(children, ctx) => (children, ctx),
+            _ => unreachable!(),
+        };
+
+        let write_null = |children: &mut [LevelInfoBuilder], range: 
Range<usize>| {
+            for child in children {
+                child.visit_leaves(|info| {
+                    let len = range.end - range.start;
+
+                    let def_levels = info.def_levels.as_mut().unwrap();
+                    def_levels.reserve(len);
+                    for _ in 0..len {
+                        def_levels.push(ctx.def_level - 1);
+                    }
+
+                    if let Some(rep_levels) = info.rep_levels.as_mut() {
+                        rep_levels.reserve(len);
+                        for _ in 0..len {
+                            rep_levels.push(ctx.rep_level)
                         }
-                    });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                let offset = *array_offsets.first().unwrap() as usize;
-                let length = *array_offsets.last().unwrap() as usize - offset;
-
-                Self {
-                    definition,
-                    repetition: Some(repetition),
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    offset,
-                    length,
-                }
+                    }
+                })
             }
-            (_, _) => {
-                self.definition
-                    .iter()
-                    .zip(array_mask.into_iter().zip(&self.array_mask))
-                    .for_each(|(current_def, (child_mask, parent_mask))| {
-                        merged_array_mask.push(*parent_mask && child_mask);
-                        match (parent_mask, child_mask) {
-                            (true, true) => {
-                                definition.push(max_definition);
-                            }
-                            (true, false) => {
-                                // The child is only legally null if its array 
is nullable.
-                                // Thus parent's max_definition is lower
-                                definition.push(if *current_def <= 
self.max_definition {
-                                    *current_def
-                                } else {
-                                    self.max_definition
-                                });
+        };
+
+        let write_non_null = |children: &mut [LevelInfoBuilder], range: 
Range<usize>| {
+            for (child_array, child) in 
array.columns().into_iter().zip(children) {
+                child.write(child_array, range.clone())
+            }
+        };
+
+        match array.data().null_bitmap() {
+            Some(validity) => {
+                let null_offset = array.data().offset();
+                let mut last_non_null_idx = None;
+                let mut last_null_idx = None;
+
+                // TODO: BitChunkIterator
+                for i in range.clone() {
+                    match validity.is_set(i + null_offset) {
+                        true => {
+                            if let Some(last_idx) = last_null_idx.take() {
+                                write_null(children, last_idx..i)
                             }
-                            // if the parent was false, retain its definitions
-                            (false, _) => {
-                                definition.push(*current_def);
+                            last_non_null_idx.get_or_insert(i);
+                        }
+                        false => {
+                            if let Some(last_idx) = last_non_null_idx.take() {
+                                write_non_null(children, last_idx..i)
                             }
+                            last_null_idx.get_or_insert(i);
                         }
-                    });
-
-                debug_assert_eq!(definition.len(), merged_array_mask.len());
-
-                Self {
-                    definition,
-                    repetition: self.repetition.clone(), // it's None
-                    array_offsets,
-                    array_mask: merged_array_mask,
-                    max_definition,
-                    level_type,
-                    // Inherit parent offset and length
-                    offset: self.offset,
-                    length: self.length,
+                    }
                 }
-            }
-        }
-    }
 
-    /// Get the offsets of an array as 64-bit values, and validity masks as 
booleans
-    /// - Primitive, binary and struct arrays' offsets will be a sequence, 
masks obtained
-    ///   from validity bitmap
-    /// - List array offsets will be the value offsets, masks are computed 
from offsets
-    fn get_array_offsets_and_masks(
-        array: &ArrayRef,
-        offset: usize,
-        len: usize,
-    ) -> (Vec<i64>, Vec<bool>) {
-        match array.data_type() {
-            // A NullArray is entirely nulls, despite not containing a null 
buffer
-            DataType::Null => ((0..=(len as i64)).collect(), vec![false; len]),
-            DataType::Boolean
-            | DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Float16
-            | DataType::Float32
-            | DataType::Float64
-            | DataType::Timestamp(_, _)
-            | DataType::Date32
-            | DataType::Date64
-            | DataType::Time32(_)
-            | DataType::Time64(_)
-            | DataType::Duration(_)
-            | DataType::Interval(_)
-            | DataType::Binary
-            | DataType::LargeBinary
-            | DataType::Utf8
-            | DataType::LargeUtf8
-            | DataType::Struct(_)
-            | DataType::Dictionary(_, _)
-            | DataType::Decimal(_, _) => {
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + 
offset, len),
-                    None => vec![true; len],
-                };
-                ((0..=(len as i64)).collect(), array_mask)
-            }
-            DataType::List(_) | DataType::Map(_, _) => {
-                let offsets = unsafe { 
array.data().buffers()[0].typed_data::<i32>() };
-                let offsets = offsets
-                    .iter()
-                    .copied()
-                    .skip(array.offset() + offset)
-                    .take(len + 1)
-                    .map(|v| v as i64)
-                    .collect::<Vec<i64>>();
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + 
offset, len),
-                    None => vec![true; len],
-                };
-                (offsets, array_mask)
-            }
-            DataType::LargeList(_) => {
-                let offsets = unsafe { 
array.data().buffers()[0].typed_data::<i64>() }
-                    .iter()
-                    .skip(array.offset() + offset)
-                    .take(len + 1)
-                    .copied()
-                    .collect();
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + 
offset, len),
-                    None => vec![true; len],
-                };
-                (offsets, array_mask)
-            }
-            DataType::FixedSizeBinary(value_len) => {
-                let array_mask = match array.data().null_buffer() {
-                    Some(buf) => get_bool_array_slice(buf, array.offset() + 
offset, len),
-                    None => vec![true; len],
-                };
-                let value_len = *value_len as i64;
-                (
-                    (0..=(len as i64)).map(|v| v * value_len).collect(),
-                    array_mask,
-                )
-            }
-            DataType::FixedSizeList(_, _) | DataType::Union(_, _, _) => {
-                unimplemented!("Getting offsets not yet implemented")
+                if let Some(last_idx) = last_null_idx.take() {
+                    write_null(children, last_idx..range.end)
+                }
+
+                if let Some(last_idx) = last_non_null_idx.take() {
+                    write_non_null(children, last_idx..range.end)
+                }
             }
+            None => write_non_null(children, range),
         }
     }
 
-    /// Given a level's information, calculate the offsets required to index 
an array correctly.
-    pub(crate) fn filter_array_indices(&self) -> Vec<usize> {
-        if !matches!(self.level_type, LevelType::Primitive(_)) {
-            panic!(
-                "Cannot filter indices on a non-primitive array, found {:?}",
-                self.level_type
-            );
-        }
-
-        // happy path if not dealing with lists
-        if self.repetition.is_none() {
-            return self
-                .definition
-                .iter()
-                .enumerate()
-                .filter_map(|(i, def)| {
-                    if *def == self.max_definition {
-                        Some(i)
-                    } else {
-                        None
+    /// Write a primitive array, as defined by [`is_leaf`]
+    fn write_leaf(&mut self, array: &ArrayRef, range: Range<usize>) {
+        let info = match self {
+            Self::Primitive(info) => info,
+            _ => unreachable!(),
+        };
+
+        let len = range.end - range.start;
+
+        match &mut info.def_levels {
+            Some(def_levels) => {
+                def_levels.reserve(len);
+                info.non_null_indices.reserve(len);
+
+                match array.data().null_bitmap() {
+                    Some(nulls) => {
+                        let nulls_offset = array.data().offset();
+                        for i in range {
+                            match nulls.is_set(i + nulls_offset) {
+                                true => {
+                                    def_levels.push(info.max_def_level);
+                                    info.non_null_indices.push(i)
+                                }
+                                false => def_levels.push(info.max_def_level - 
1),
+                            }
+                        }
                     }
-                })
-                .collect();
-        }
-
-        let mut filtered = vec![];
-        let mut definition_levels = self.definition.iter();
-        let mut index = 0;
-
-        for len in self.array_offsets.windows(2).map(|s| s[1] - s[0]) {
-            if len == 0 {
-                // Skip this definition level--the iterator should not be 
empty, and the definition
-                // level be less than max_definition, i.e., a null value)
-                assert!(*definition_levels.next().unwrap() < 
self.max_definition);
-            } else {
-                for (_, def) in (0..len).zip(&mut definition_levels) {
-                    if *def == self.max_definition {
-                        filtered.push(index);
+                    None => {
+                        let iter = 
std::iter::repeat(info.max_def_level).take(len);
+                        def_levels.extend(iter);
+                        info.non_null_indices.extend(range);
                     }
-                    index += 1;
                 }
             }
+            None => info.non_null_indices.extend(range),
         }
 
-        filtered
+        if let Some(rep_levels) = &mut info.rep_levels {
+            rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len))
+        }
     }
-}
 
-/// Convert an Arrow buffer to a boolean array slice
-/// TODO: this was created for buffers, so might not work for bool array, 
might be slow too
-#[inline]
-fn get_bool_array_slice(
-    buffer: &arrow::buffer::Buffer,
-    offset: usize,
-    len: usize,
-) -> Vec<bool> {
-    let data = buffer.as_slice();
-    (offset..(len + offset))
-        .map(|i| arrow::util::bit_util::get_bit(data, i))
-        .collect()
+    /// Visits all children of this node in depth first order
+    fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) {
+        match self {
+            LevelInfoBuilder::Primitive(info) => visit(info),
+            LevelInfoBuilder::List(c, _) => c.visit_leaves(visit),
+            LevelInfoBuilder::Struct(children, _) => {
+                for c in children {
+                    c.visit_leaves(visit)
+                }
+            }
+        }
+    }
 }
+/// The data necessary to write a primitive Arrow array to parquet, taking 
into account
+/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub(crate) struct LevelInfo {
+    /// Array's definition levels
+    ///
+    /// Present if `max_def_level != 0`
+    def_levels: Option<Vec<i16>>,
 
-#[cfg(test)]
-mod tests {
-    use super::*;
+    /// Array's optional repetition levels
+    ///
+    /// Present if `max_rep_level != 0`
+    rep_levels: Option<Vec<i16>>,
 
-    use std::sync::Arc;
+    /// The corresponding array identifying non-null slices of data
+    /// from the primitive array
+    non_null_indices: Vec<usize>,
 
-    use arrow::array::*;
-    use arrow::buffer::Buffer;
-    use arrow::datatypes::{Schema, ToByteSlice};
-    use arrow::record_batch::RecordBatch;
+    /// The maximum definition level for this leaf column
+    max_def_level: i16,
 
-    #[test]
-    fn test_calculate_array_levels_twitter_example() {

Review Comment:
   I'm currently working through porting these tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to