jhorstmann commented on code in PR #9848:
URL: https://github.com/apache/arrow-rs/pull/9848#discussion_r3292959568
##########
parquet/src/arrow/array_reader/list_array.rs:
##########
@@ -97,121 +108,72 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for
ListArrayReader<OffsetSize> {
.get_rep_levels()
.ok_or_else(|| general_err!("item_reader rep levels are None."))?;
- if OffsetSize::from_usize(next_batch_array.len()).is_none() {
- return Err(general_err!(
- "offset of {} would overflow list array",
- next_batch_array.len()
- ));
+ if def_levels.is_empty() {
+ return Ok(new_empty_array(&self.data_type));
}
- if !rep_levels.is_empty() && rep_levels[0] != 0 {
- // This implies either the source data was invalid, or the leaf
column
- // reader did not correctly delimit semantic records
+ if rep_levels[0] != 0 {
return Err(general_err!("first repetition level of batch must be
0"));
}
- // A non-nullable list has a single definition level indicating if the
list is empty
- //
- // A nullable list has two definition levels associated with it:
- //
- // The first identifies if the list is null
- // The second identifies if the list is empty
- //
- // The child data returned above is padded with a value for each
not-fully defined level.
- // Therefore null and empty lists will correspond to a value in the
child array.
- //
- // Whilst nulls may have a non-zero slice in the offsets array, empty
lists must
- // be of zero length. As a result we MUST filter out values
corresponding to empty
- // lists, and for consistency we do the same for nulls.
-
- // The output offsets for the computed ListArray
- let mut list_offsets: Vec<OffsetSize> =
Vec::with_capacity(next_batch_array.len() + 1);
-
- // The validity mask of the computed ListArray if nullable
- let mut validity = self
- .nullable
- .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
-
- // The offset into the filtered child data of the current level being
considered
- let mut cur_offset = 0;
-
- // Identifies the start of a run of values to copy from the source
child data
- let mut filter_start = None;
-
- // The number of child values skipped due to empty lists or nulls
- let mut skipped = 0;
-
- // Builder used to construct the filtered child data, skipping empty
lists and nulls
- let data = next_batch_array.to_data();
- let mut child_data_builder =
- MutableArrayData::new(vec![&data], false, next_batch_array.len());
-
- def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
- match r.cmp(&self.rep_level) {
- Ordering::Greater => {
- // Repetition level greater than current => already
handled by inner array
- if *d < self.def_level {
- return Err(general_err!(
- "Encountered repetition level too large for
definition level"
- ));
- }
- }
- Ordering::Equal => {
- // New value in the current list
- cur_offset += 1;
- }
- Ordering::Less => {
- // Create new array slice
- // Already checked that this cannot overflow
-
list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
-
- if *d >= self.def_level {
- // Fully defined value
+ if OffsetSize::from_usize(child_array.len()).is_none() {
+ return Err(general_err!(
+ "offset of {} would overflow list array",
+ child_array.len()
+ ));
+ }
- // Record current offset if it is None
- filter_start.get_or_insert(cur_offset + skipped);
+ let levels_len = def_levels.len();
+ let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(levels_len
+ 1);
+ let mut validity = self.nullable.then(||
BooleanBufferBuilder::new(levels_len));
- cur_offset += 1;
+ let mut cur_offset: usize = 0;
- if let Some(validity) = validity.as_mut() {
- validity.append(true)
- }
- } else {
- // Flush the current slice of child values if any
- if let Some(start) = filter_start.take() {
- child_data_builder.extend(0, start, cur_offset +
skipped);
- }
+ for (&d, &r) in def_levels.iter().zip(rep_levels) {
+ // When this list is a child of another list, skip entries
+ // belonging to null/empty parent lists.
+ if let Some(threshold) = self.parent_threshold {
+ if d < threshold {
+ continue;
+ }
+ }
- if let Some(validity) = validity.as_mut() {
- // Valid if empty list
- validity.append(*d + 1 == self.def_level)
- }
+ if r < self.rep_level {
+ // List boundary
+ list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
- skipped += 1;
+ if let Some(v) = validity.as_mut() {
+ if d >= self.def_level {
+ v.append(true);
+ } else {
+ v.append(d + 1 == self.def_level);
}
}
}
- Ok(())
- })?;
- list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
-
- let child_data = if skipped == 0 {
- // No filtered values - can reuse original array
- next_batch_array.to_data()
- } else {
- // One or more filtered values - must build new array
- if let Some(start) = filter_start.take() {
- child_data_builder.extend(0, start, cur_offset + skipped)
+ // Count child array entries. Using r <= self.rep_level ensures
+ // we only count direct children: for leaf children every entry
+ // qualifies, but for nested list children this correctly
+ // excludes continuation entries within the child list (r >
rep_level).
+ if d >= self.def_level && r <= self.rep_level {
Review Comment:
Intuitively I would have thought this needs to be `r == self.rep_level`, at
`rep_level-1` it would be an empty list, which should not increase the offset.
I must be missing something, since there are working tests for nested and empty
lists.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]