sunchao commented on a change in pull request #6770:
URL: https://github.com/apache/arrow/pull/6770#discussion_r438476116



##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => 
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => 
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => 
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => 
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => 
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match 
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, 
$len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, 
ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, 
indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, 
ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, 
ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, 
ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, 
indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, 
indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, 
StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, 
BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => 
remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+impl ArrayReader for ListArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let next_batch_array = 
self.item_reader.next_batch(batch_size).unwrap();
+        let item_type = self.item_reader.get_data_type().clone();
+
+        if next_batch_array.len() == 0 {
+            return build_empty_list_array(item_type);
+        }
+        let def_levels = self.item_reader.get_def_levels().unwrap();
+        let rep_levels = self.item_reader.get_rep_levels().unwrap();
+
+        if !((def_levels.len() == rep_levels.len())
+            && (rep_levels.len() == next_batch_array.len()))
+        {
+            return Err(ArrowError(
+                "Expected item_reader def_level and rep_level arrays to have 
the same length as batch array".to_string(),
+            ));
+        }
+
+        // Need to remove from the values array the nulls that represent null 
lists rather than null items
+        // null lists have def_level = 0
+        let mut null_list_indices: Vec<usize> = Vec::new();
+        for i in 0..def_levels.len() {
+            if def_levels[i] == 0 {
+                null_list_indices.push(i);
+            }
+        }
+        let batch_values = match null_list_indices.len() {
+            0 => next_batch_array.clone(),
+            _ => remove_indices(next_batch_array.clone(), item_type, 
null_list_indices)?,

Review comment:
       Not sure if I understand you correctly. I think a null list is 
represented with the combination of valid bit array and offset array, and 
should have no presence in the value array (example 
[here](https://arrow.apache.org/docs/format/Columnar.html#variable-size-list-layout)).
 Therefore, the input array can just be passed intact to the ListArray as value 
array.
   
   It would also be useful to refer the C++ [impl. 
](https://github.com/apache/arrow/blob/master/cpp/src/parquet/arrow/reader_internal.cc#L1348)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to