This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1064e feat(parquet): support for reading structs nested within
lists (#1187)
4f1064e is described below
commit 4f1064ef536028e49a07a09204e4a930098e93ba
Author: Helgi Kristvin Sigurbjarnarson <[email protected]>
AuthorDate: Tue Jan 18 04:09:12 2022 -0800
feat(parquet): support for reading structs nested within lists (#1187)
* feat(parquet): support for reading structs nested within lists
* fix: logical conflict
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/array_reader.rs | 31 +++++++++++-
parquet/src/arrow/arrow_writer.rs | 100 ++++++++++++++++++++++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index 6ba08f9..537b4cc 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -824,6 +824,36 @@ fn remove_indices(
indices,
size
),
+ ArrowType::Struct(fields) => {
+ let struct_array = arr.as_any()
+ .downcast_ref::<StructArray>()
+ .expect("Array should be a struct");
+
+ // Recursively call remove indices on each of the structs fields
+ let new_columns = fields.into_iter()
+ .zip(struct_array.columns())
+ .map(|(field, column)| {
+ let dt = field.data_type().clone();
+ Ok((field,
+ remove_indices(column.clone(), dt, indices.clone())?))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ if arr.data().null_count() == 0 {
+ // No nulls, nothing to do.
+ Ok(Arc::new(StructArray::from(new_columns)))
+ } else {
+ // Construct a new validity buffer by removing `indices` from
the original validity
+ // map.
+ let mut valid = BooleanBufferBuilder::new(arr.len() -
indices.len());
+ for idx in 0..arr.len() {
+ if !indices.contains(&idx) {
+ valid.append(!arr.is_null(idx));
+ }
+ }
+ Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
+ }
+ }
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
@@ -1562,7 +1592,6 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext
match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
- | ArrowType::Struct(_)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
diff --git a/parquet/src/arrow/arrow_writer.rs
b/parquet/src/arrow/arrow_writer.rs
index c7a5f06..fc3a567 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -1623,4 +1623,104 @@ mod tests {
let stats = column.statistics().unwrap();
assert_eq!(stats.null_count(), 2);
}
+
+ #[test]
+ fn test_list_of_struct_roundtrip() {
+ // define schema
+ let int_field = Field::new("a", DataType::Int32, true);
+ let int_field2 = Field::new("b", DataType::Int32, true);
+
+ let int_builder = Int32Builder::new(10);
+ let int_builder2 = Int32Builder::new(10);
+
+ let struct_builder = StructBuilder::new(
+ vec![int_field, int_field2],
+ vec![Box::new(int_builder), Box::new(int_builder2)],
+ );
+ let mut list_builder = ListBuilder::new(struct_builder);
+
+ // Construct the following array
+ // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2,
b: null}]
+
+ // [{a: 1, b: 2}]
+ let values = list_builder.values();
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_value(1)
+ .unwrap();
+ values
+ .field_builder::<Int32Builder>(1)
+ .unwrap()
+ .append_value(2)
+ .unwrap();
+ values.append(true).unwrap();
+ list_builder.append(true).unwrap();
+
+ // []
+ list_builder.append(true).unwrap();
+
+ // null
+ list_builder.append(false).unwrap();
+
+ // [null, null]
+ let values = list_builder.values();
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values
+ .field_builder::<Int32Builder>(1)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values.append(false).unwrap();
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values
+ .field_builder::<Int32Builder>(1)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values.append(false).unwrap();
+ list_builder.append(true).unwrap();
+
+ // [{a: null, b: 3}]
+ let values = list_builder.values();
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values
+ .field_builder::<Int32Builder>(1)
+ .unwrap()
+ .append_value(3)
+ .unwrap();
+ values.append(true).unwrap();
+ list_builder.append(true).unwrap();
+
+ // [{a: 2, b: null}]
+ let values = list_builder.values();
+ values
+ .field_builder::<Int32Builder>(0)
+ .unwrap()
+ .append_value(2)
+ .unwrap();
+ values
+ .field_builder::<Int32Builder>(1)
+ .unwrap()
+ .append_null()
+ .unwrap();
+ values.append(true).unwrap();
+ list_builder.append(true).unwrap();
+
+ let array = Arc::new(list_builder.finish());
+
+ one_column_roundtrip(array, true, Some(10));
+ }
}