mcassels commented on a change in pull request #6770:
URL: https://github.com/apache/arrow/pull/6770#discussion_r437067752



##########
File path: rust/arrow/src/ipc/convert.rs
##########
@@ -518,11 +518,12 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
             )
         }
         List(ref list_type) => {
+            let field_name = fbb.create_string("list"); // field schema 
requires name to be not None

Review comment:
       The reason why I named it "list": "The outer-most level must be a group 
annotated with LIST that contains a single field named list" 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => 
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => 
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => 
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => 
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => 
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match 
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, 
$len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, 
ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, 
indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, 
ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, 
ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, 
ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, 
indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, 
indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, 
StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, 
BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => 
remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+impl ArrayReader for ListArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let next_batch_array = 
self.item_reader.next_batch(batch_size).unwrap();
+        let item_type = self.item_reader.get_data_type().clone();
+
+        if next_batch_array.len() == 0 {
+            return build_empty_list_array(item_type);
+        }
+        let def_levels = self.item_reader.get_def_levels().unwrap();
+        let rep_levels = self.item_reader.get_rep_levels().unwrap();
+
+        if !((def_levels.len() == rep_levels.len())
+            && (rep_levels.len() == next_batch_array.len()))
+        {
+            return Err(ArrowError(
+                "Expected item_reader def_level and rep_level arrays to have 
the same length as batch array".to_string(),
+            ));
+        }
+
+        // Need to remove from the values array the nulls that represent null 
lists rather than null items
+        // null lists have def_level = 0
+        let mut null_list_indices: Vec<usize> = Vec::new();
+        for i in 0..def_levels.len() {
+            if def_levels[i] == 0 {
+                null_list_indices.push(i);
+            }
+        }
+        let batch_values = match null_list_indices.len() {
+            0 => next_batch_array.clone(),
+            _ => remove_indices(next_batch_array.clone(), item_type, 
null_list_indices)?,

Review comment:
       As I understand it, the null values that represent a null **list** 
rather than a null list item need to not be present in the child data of the 
final ListArray. I don't like that this uses the builder either, but I wasn't 
sure how else to create that child data array that has specific indices 
excluded... I might be misunderstanding something though. Do you have any 
suggestions?

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a 
ArrayReaderBuilderContext
     }
 
     /// Build array reader for list type.
-    /// Currently this is not supported.
     fn visit_list_with_item(
         &mut self,
-        _list_type: Rc<Type>,
+        list_type: Rc<Type>,
         _item_type: &Type,
-        _context: &'a ArrayReaderBuilderContext,
+        context: &'a ArrayReaderBuilderContext,
     ) -> Result<Option<Box<dyn ArrayReader>>> {
-        Err(ArrowError(
-            "Reading parquet list array into arrow is not supported 
yet!".to_string(),
-        ))
+        let mut new_context = context.clone();
+
+        let list_child = &list_type.get_fields()[0];
+        let item_child = &list_child.get_fields()[0];

Review comment:
       Yes, that makes more sense. I'm not sure why I did it that way before

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => 
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => 
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => 
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => 
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => 
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match 
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, 
$len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, 
ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, 
indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, 
ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, 
ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, 
ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, 
indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, 
indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, 
StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, 
BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => 
remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+impl ArrayReader for ListArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {

Review comment:
       I've just added a couple comments -- on the ArrayReader implementation 
and on visit_list_with_item

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => 
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => 
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => 
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => 
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => 
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match 
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, 
$len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, 
ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, 
indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, 
ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, 
ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, 
ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, 
indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, 
indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, 
StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, 
BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => 
remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+impl ArrayReader for ListArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let next_batch_array = 
self.item_reader.next_batch(batch_size).unwrap();
+        let item_type = self.item_reader.get_data_type().clone();
+
+        if next_batch_array.len() == 0 {
+            return build_empty_list_array(item_type);
+        }
+        let def_levels = self.item_reader.get_def_levels().unwrap();
+        let rep_levels = self.item_reader.get_rep_levels().unwrap();
+
+        if !((def_levels.len() == rep_levels.len())
+            && (rep_levels.len() == next_batch_array.len()))
+        {
+            return Err(ArrowError(
+                "Expected item_reader def_level and rep_level arrays to have 
the same length as batch array".to_string(),
+            ));
+        }
+
+        // Need to remove from the values array the nulls that represent null 
lists rather than null items
+        // null lists have def_level = 0
+        let mut null_list_indices: Vec<usize> = Vec::new();
+        for i in 0..def_levels.len() {
+            if def_levels[i] == 0 {
+                null_list_indices.push(i);
+            }
+        }
+        let batch_values = match null_list_indices.len() {
+            0 => next_batch_array.clone(),
+            _ => remove_indices(next_batch_array.clone(), item_type, 
null_list_indices)?,
+        };
+
+        // null list has def_level = 0
+        // empty list has def_level = 1
+        // null item in a list has def_level = 2

Review comment:
       Yeah it seems that this comment is not correct... I think it would be 
correct just in the case where both the list and the list item are optional. 
   
   However, much of the logic in this comment is not actually used: Rep and def 
levels are compared only with 0. Do you see any flaws in the def and rep level 
logic that comes after this comment? If the actual logic is ok I'll just remove 
the comment.

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a 
ArrayReaderBuilderContext
     }
 
     /// Build array reader for list type.
-    /// Currently this is not supported.
     fn visit_list_with_item(
         &mut self,
-        _list_type: Rc<Type>,
+        list_type: Rc<Type>,
         _item_type: &Type,
-        _context: &'a ArrayReaderBuilderContext,
+        context: &'a ArrayReaderBuilderContext,
     ) -> Result<Option<Box<dyn ArrayReader>>> {
-        Err(ArrowError(
-            "Reading parquet list array into arrow is not supported 
yet!".to_string(),
-        ))
+        let mut new_context = context.clone();
+
+        let list_child = &list_type.get_fields()[0];
+        let item_child = &list_child.get_fields()[0];

Review comment:
       The failing test made me realize why I had it that way. Currently 
item_type is passed as `&Type` to `visit_list_with_item` while list_type is 
passed as `Rc<Type>`. Because item_type is not passed as `Rc<Type>`, if 
`item_type` is used then when the list items are visited by 
PrimitiveArrayReader, they will not satisfy the `self.is_included` check, 
because that checks using the reference. 
   
   I've now changed the signature of `visit_list_with_item` to pass item_type 
as `Rc<Type>` to fix this issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to