mcassels commented on a change in pull request #6770:
URL: https://github.com/apache/arrow/pull/6770#discussion_r437067752
##########
File path: rust/arrow/src/ipc/convert.rs
##########
@@ -518,11 +518,12 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
)
}
List(ref list_type) => {
+ let field_name = fbb.create_string("list"); // field schema
requires name to be not None
Review comment:
The reason why I named it "list": "The outer-most level must be a group
annotated with LIST that contains a single field named list"
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
}
}
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ list_def_level: i16,
+ list_rep_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+ /// Construct list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ ) -> Self {
+ Self {
+ item_reader,
+ data_type,
+ item_type,
+ list_def_level: def_level,
+ list_rep_level: rep_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ }
+ }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+ ($item_type:ident) => {{
+ let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+ ($builder:ident) => {{
+ let values_builder = $builder::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 =>
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+ ArrowType::UInt16 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+ }
+ ArrowType::UInt32 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+ }
+ ArrowType::UInt64 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+ }
+ ArrowType::Int8 =>
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+ ArrowType::Int16 =>
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+ ArrowType::Int32 =>
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+ ArrowType::Int64 =>
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+ ArrowType::Float32 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+ }
+ ArrowType::Float64 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+ }
+ ArrowType::Boolean => {
+ build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+ }
+ ArrowType::Date32(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+ }
+ ArrowType::Date64(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+ }
+ ArrowType::Utf8 => {
+ build_empty_list_array_with_non_primitive_items!(StringBuilder)
+ }
+ ArrowType::Binary => {
+ build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+ }
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+macro_rules! remove_primitive_array_indices {
+ ($arr: expr, $item_type:ty, $indices:expr) => {{
+ let array_data = match
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = $item_builder::new(array_data.len());
+
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr,
$len:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+fn remove_indices(
+ arr: ArrayRef,
+ item_type: ArrowType,
+ indices: Vec<usize>,
+) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 => remove_primitive_array_indices!(arr,
ArrowUInt8Type, indices),
+ ArrowType::UInt16 => {
+ remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+ }
+ ArrowType::UInt32 => {
+ remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+ }
+ ArrowType::UInt64 => {
+ remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+ }
+ ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type,
indices),
+ ArrowType::Int16 => remove_primitive_array_indices!(arr,
ArrowInt16Type, indices),
+ ArrowType::Int32 => remove_primitive_array_indices!(arr,
ArrowInt32Type, indices),
+ ArrowType::Int64 => remove_primitive_array_indices!(arr,
ArrowInt64Type, indices),
+ ArrowType::Float32 => {
+ remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+ }
+ ArrowType::Float64 => {
+ remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+ }
+ ArrowType::Boolean => {
+ remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+ }
+ ArrowType::Date32(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+ }
+ ArrowType::Date64(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowTime32SecondType,
indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime32MillisecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64NanosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowDurationSecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMillisecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationNanosecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampSecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMillisecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMicrosecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType,
indices)
+ }
+ ArrowType::Utf8 => {
+ remove_array_indices_custom_builder!(arr, StringArray,
StringBuilder, indices)
+ }
+ ArrowType::Binary => {
+ remove_array_indices_custom_builder!(arr, BinaryArray,
BinaryBuilder, indices)
+ }
+ ArrowType::FixedSizeBinary(size) =>
remove_fixed_size_binary_array_indices!(
+ arr,
+ FixedSizeBinaryArray,
+ FixedSizeBinaryBuilder,
+ indices,
+ size
+ ),
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+impl ArrayReader for ListArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns data type.
+ /// This must be a List.
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let next_batch_array =
self.item_reader.next_batch(batch_size).unwrap();
+ let item_type = self.item_reader.get_data_type().clone();
+
+ if next_batch_array.len() == 0 {
+ return build_empty_list_array(item_type);
+ }
+ let def_levels = self.item_reader.get_def_levels().unwrap();
+ let rep_levels = self.item_reader.get_rep_levels().unwrap();
+
+ if !((def_levels.len() == rep_levels.len())
+ && (rep_levels.len() == next_batch_array.len()))
+ {
+ return Err(ArrowError(
+ "Expected item_reader def_level and rep_level arrays to have
the same length as batch array".to_string(),
+ ));
+ }
+
+ // Need to remove from the values array the nulls that represent null
lists rather than null items
+ // null lists have def_level = 0
+ let mut null_list_indices: Vec<usize> = Vec::new();
+ for i in 0..def_levels.len() {
+ if def_levels[i] == 0 {
+ null_list_indices.push(i);
+ }
+ }
+ let batch_values = match null_list_indices.len() {
+ 0 => next_batch_array.clone(),
+ _ => remove_indices(next_batch_array.clone(), item_type,
null_list_indices)?,
Review comment:
As I understand it, the null values that represent a null **list**
rather than a null list item need to not be present in the child data of the
final ListArray. I don't like that this uses the builder either, but I wasn't
sure how else to create that child data array that has specific indices
excluded... I might be misunderstanding something though. Do you have any
suggestions?
##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext
}
/// Build array reader for list type.
- /// Currently this is not supported.
fn visit_list_with_item(
&mut self,
- _list_type: Rc<Type>,
+ list_type: Rc<Type>,
_item_type: &Type,
- _context: &'a ArrayReaderBuilderContext,
+ context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
- Err(ArrowError(
- "Reading parquet list array into arrow is not supported
yet!".to_string(),
- ))
+ let mut new_context = context.clone();
+
+ let list_child = &list_type.get_fields()[0];
+ let item_child = &list_child.get_fields()[0];
Review comment:
Yes, that makes more sense. I'm not sure why I did it that way before
##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
}
}
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ list_def_level: i16,
+ list_rep_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+ /// Construct list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ ) -> Self {
+ Self {
+ item_reader,
+ data_type,
+ item_type,
+ list_def_level: def_level,
+ list_rep_level: rep_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ }
+ }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+ ($item_type:ident) => {{
+ let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+ ($builder:ident) => {{
+ let values_builder = $builder::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 =>
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+ ArrowType::UInt16 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+ }
+ ArrowType::UInt32 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+ }
+ ArrowType::UInt64 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+ }
+ ArrowType::Int8 =>
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+ ArrowType::Int16 =>
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+ ArrowType::Int32 =>
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+ ArrowType::Int64 =>
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+ ArrowType::Float32 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+ }
+ ArrowType::Float64 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+ }
+ ArrowType::Boolean => {
+ build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+ }
+ ArrowType::Date32(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+ }
+ ArrowType::Date64(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+ }
+ ArrowType::Utf8 => {
+ build_empty_list_array_with_non_primitive_items!(StringBuilder)
+ }
+ ArrowType::Binary => {
+ build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+ }
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+macro_rules! remove_primitive_array_indices {
+ ($arr: expr, $item_type:ty, $indices:expr) => {{
+ let array_data = match
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = $item_builder::new(array_data.len());
+
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr,
$len:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+fn remove_indices(
+ arr: ArrayRef,
+ item_type: ArrowType,
+ indices: Vec<usize>,
+) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 => remove_primitive_array_indices!(arr,
ArrowUInt8Type, indices),
+ ArrowType::UInt16 => {
+ remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+ }
+ ArrowType::UInt32 => {
+ remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+ }
+ ArrowType::UInt64 => {
+ remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+ }
+ ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type,
indices),
+ ArrowType::Int16 => remove_primitive_array_indices!(arr,
ArrowInt16Type, indices),
+ ArrowType::Int32 => remove_primitive_array_indices!(arr,
ArrowInt32Type, indices),
+ ArrowType::Int64 => remove_primitive_array_indices!(arr,
ArrowInt64Type, indices),
+ ArrowType::Float32 => {
+ remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+ }
+ ArrowType::Float64 => {
+ remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+ }
+ ArrowType::Boolean => {
+ remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+ }
+ ArrowType::Date32(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+ }
+ ArrowType::Date64(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowTime32SecondType,
indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime32MillisecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64NanosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowDurationSecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMillisecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationNanosecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampSecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMillisecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMicrosecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType,
indices)
+ }
+ ArrowType::Utf8 => {
+ remove_array_indices_custom_builder!(arr, StringArray,
StringBuilder, indices)
+ }
+ ArrowType::Binary => {
+ remove_array_indices_custom_builder!(arr, BinaryArray,
BinaryBuilder, indices)
+ }
+ ArrowType::FixedSizeBinary(size) =>
remove_fixed_size_binary_array_indices!(
+ arr,
+ FixedSizeBinaryArray,
+ FixedSizeBinaryBuilder,
+ indices,
+ size
+ ),
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+impl ArrayReader for ListArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns data type.
+ /// This must be a List.
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
Review comment:
I've just added a couple comments -- on the ArrayReader implementation
and on visit_list_with_item
##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
}
}
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ list_def_level: i16,
+ list_rep_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+ /// Construct list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ ) -> Self {
+ Self {
+ item_reader,
+ data_type,
+ item_type,
+ list_def_level: def_level,
+ list_rep_level: rep_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ }
+ }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+ ($item_type:ident) => {{
+ let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+ ($builder:ident) => {{
+ let values_builder = $builder::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 =>
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+ ArrowType::UInt16 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+ }
+ ArrowType::UInt32 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+ }
+ ArrowType::UInt64 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+ }
+ ArrowType::Int8 =>
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+ ArrowType::Int16 =>
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+ ArrowType::Int32 =>
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+ ArrowType::Int64 =>
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+ ArrowType::Float32 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+ }
+ ArrowType::Float64 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+ }
+ ArrowType::Boolean => {
+ build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+ }
+ ArrowType::Date32(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+ }
+ ArrowType::Date64(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+ }
+ ArrowType::Utf8 => {
+ build_empty_list_array_with_non_primitive_items!(StringBuilder)
+ }
+ ArrowType::Binary => {
+ build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+ }
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+macro_rules! remove_primitive_array_indices {
+ ($arr: expr, $item_type:ty, $indices:expr) => {{
+ let array_data = match
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = $item_builder::new(array_data.len());
+
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr,
$len:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+fn remove_indices(
+ arr: ArrayRef,
+ item_type: ArrowType,
+ indices: Vec<usize>,
+) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 => remove_primitive_array_indices!(arr,
ArrowUInt8Type, indices),
+ ArrowType::UInt16 => {
+ remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+ }
+ ArrowType::UInt32 => {
+ remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+ }
+ ArrowType::UInt64 => {
+ remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+ }
+ ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type,
indices),
+ ArrowType::Int16 => remove_primitive_array_indices!(arr,
ArrowInt16Type, indices),
+ ArrowType::Int32 => remove_primitive_array_indices!(arr,
ArrowInt32Type, indices),
+ ArrowType::Int64 => remove_primitive_array_indices!(arr,
ArrowInt64Type, indices),
+ ArrowType::Float32 => {
+ remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+ }
+ ArrowType::Float64 => {
+ remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+ }
+ ArrowType::Boolean => {
+ remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+ }
+ ArrowType::Date32(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+ }
+ ArrowType::Date64(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowTime32SecondType,
indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime32MillisecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64NanosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowDurationSecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMillisecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationNanosecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampSecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMillisecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMicrosecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType,
indices)
+ }
+ ArrowType::Utf8 => {
+ remove_array_indices_custom_builder!(arr, StringArray,
StringBuilder, indices)
+ }
+ ArrowType::Binary => {
+ remove_array_indices_custom_builder!(arr, BinaryArray,
BinaryBuilder, indices)
+ }
+ ArrowType::FixedSizeBinary(size) =>
remove_fixed_size_binary_array_indices!(
+ arr,
+ FixedSizeBinaryArray,
+ FixedSizeBinaryBuilder,
+ indices,
+ size
+ ),
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+impl ArrayReader for ListArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns data type.
+ /// This must be a List.
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let next_batch_array =
self.item_reader.next_batch(batch_size).unwrap();
+ let item_type = self.item_reader.get_data_type().clone();
+
+ if next_batch_array.len() == 0 {
+ return build_empty_list_array(item_type);
+ }
+ let def_levels = self.item_reader.get_def_levels().unwrap();
+ let rep_levels = self.item_reader.get_rep_levels().unwrap();
+
+ if !((def_levels.len() == rep_levels.len())
+ && (rep_levels.len() == next_batch_array.len()))
+ {
+ return Err(ArrowError(
+ "Expected item_reader def_level and rep_level arrays to have
the same length as batch array".to_string(),
+ ));
+ }
+
+ // Need to remove from the values array the nulls that represent null
lists rather than null items
+ // null lists have def_level = 0
+ let mut null_list_indices: Vec<usize> = Vec::new();
+ for i in 0..def_levels.len() {
+ if def_levels[i] == 0 {
+ null_list_indices.push(i);
+ }
+ }
+ let batch_values = match null_list_indices.len() {
+ 0 => next_batch_array.clone(),
+ _ => remove_indices(next_batch_array.clone(), item_type,
null_list_indices)?,
+ };
+
+ // null list has def_level = 0
+ // empty list has def_level = 1
+ // null item in a list has def_level = 2
Review comment:
Yeah it seems that this comment is not correct... I think it would be
correct just in the case where both the list and the list item are optional.
However, much of the logic in this comment is not actually used: Rep and def
levels are compared only with 0. Do you see any flaws in the def and rep level
logic that comes after this comment? If the actual logic is ok I'll just remove
the comment.
##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext
}
/// Build array reader for list type.
- /// Currently this is not supported.
fn visit_list_with_item(
&mut self,
- _list_type: Rc<Type>,
+ list_type: Rc<Type>,
_item_type: &Type,
- _context: &'a ArrayReaderBuilderContext,
+ context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
- Err(ArrowError(
- "Reading parquet list array into arrow is not supported
yet!".to_string(),
- ))
+ let mut new_context = context.clone();
+
+ let list_child = &list_type.get_fields()[0];
+ let item_child = &list_child.get_fields()[0];
Review comment:
The failing test made me realize why I had it that way. Currently
item_type is passed as `&Type` to `visit_list_with_item` while list_type is
passed as `Rc<Type>`. Because item_type is not passed as `Rc<Type>`, if
`item_type` is used then when the list items are visited by
PrimitiveArrayReader, they will not satisfy the `self.is_included` check,
because that checks using the reference.
I've now changed the signature of `visit_list_with_item` to pass item_type
as `Rc<Type>` to fix this issue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]