This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 2b04fbdef9a16fa918174bba48c41544a3442ebe
Author: Neville Dipale <[email protected]>
AuthorDate: Sat Oct 17 21:14:13 2020 +0200

    ARROW-7842: [Rust] [Parquet] Arrow list reader
    
    This is a port of #6770 to the parquet-writer branch.
    
    We'll have more of a chance to test this reader,and ensure that we can 
roundtrip on list types.
    
    Closes #8449 from nevi-me/ARROW-7842-cherry
    
    Authored-by: Neville Dipale <[email protected]>
    Signed-off-by: Neville Dipale <[email protected]>
---
 rust/arrow/src/util/display.rs         |  17 +
 rust/datafusion/tests/sql.rs           |  97 ++++-
 rust/parquet/src/arrow/array_reader.rs | 630 ++++++++++++++++++++++++++++++++-
 rust/parquet/src/arrow/arrow_writer.rs |  29 +-
 rust/parquet/src/schema/visitor.rs     |  12 +-
 5 files changed, 755 insertions(+), 30 deletions(-)

diff --git a/rust/arrow/src/util/display.rs b/rust/arrow/src/util/display.rs
index 102ec5d..1a873f1 100644
--- a/rust/arrow/src/util/display.rs
+++ b/rust/arrow/src/util/display.rs
@@ -44,6 +44,22 @@ macro_rules! make_string {
     }};
 }
 
+macro_rules! make_string_from_list {
+    ($column: ident, $row: ident) => {{
+        let list = $column
+            .as_any()
+            .downcast_ref::<array::ListArray>()
+            .ok_or(ArrowError::InvalidArgumentError(format!(
+                "Repl error: could not convert list column to list array."
+            )))?
+            .value($row);
+        let string_values = (0..list.len())
+            .map(|i| array_value_to_string(&list.clone(), i))
+            .collect::<Result<Vec<String>>>()?;
+        Ok(format!("[{}]", string_values.join(", ")))
+    }};
+}
+
 /// Get the value at the given row in an array as a String.
 ///
 /// Note this function is quite inefficient and is unlikely to be
@@ -89,6 +105,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: 
usize) -> Result<Str
         DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
             make_string!(array::Time64NanosecondArray, column, row)
         }
+        DataType::List(_) => make_string_from_list!(column, row),
         DataType::Dictionary(index_type, _value_type) => match **index_type {
             DataType::Int8 => dict_array_value_to_string::<Int8Type>(column, 
row),
             DataType::Int16 => dict_array_value_to_string::<Int16Type>(column, 
row),
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 9eceb76..a35476d 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::convert::TryFrom;
 use std::env;
 use std::sync::Arc;
 
@@ -22,7 +23,7 @@ extern crate arrow;
 extern crate datafusion;
 
 use arrow::{array::*, datatypes::TimeUnit};
-use arrow::{datatypes::Int32Type, record_batch::RecordBatch};
+use arrow::{datatypes::Int32Type, datatypes::Int64Type, 
record_batch::RecordBatch};
 use arrow::{
     datatypes::{DataType, Field, Schema, SchemaRef},
     util::display::array_value_to_string,
@@ -129,6 +130,100 @@ async fn parquet_single_nan_schema() {
 }
 
 #[tokio::test]
+async fn parquet_list_columns() {
+    let mut ctx = ExecutionContext::new();
+    let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not 
defined");
+    ctx.register_parquet(
+        "list_columns",
+        &format!("{}/list_columns.parquet", testdata),
+    )
+    .unwrap();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new(
+            "int64_list",
+            DataType::List(Box::new(DataType::Int64)),
+            true,
+        ),
+        Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)), 
true),
+    ]));
+
+    let sql = "SELECT int64_list, utf8_list FROM list_columns";
+    let plan = ctx.create_logical_plan(&sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let results = ctx.collect(plan).await.unwrap();
+
+    //   int64_list              utf8_list
+    // 0  [1, 2, 3]        [abc, efg, hij]
+    // 1  [None, 1]                   None
+    // 2        [4]  [efg, None, hij, xyz]
+
+    assert_eq!(1, results.len());
+    let batch = &results[0];
+    assert_eq!(3, batch.num_rows());
+    assert_eq!(2, batch.num_columns());
+    assert_eq!(schema, batch.schema());
+
+    let int_list_array = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    let utf8_list_array = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    assert_eq!(
+        int_list_array
+            .value(0)
+            .as_any()
+            .downcast_ref::<PrimitiveArray<Int64Type>>()
+            .unwrap(),
+        &PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
+    );
+
+    assert_eq!(
+        utf8_list_array
+            .value(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap(),
+        &StringArray::try_from(vec![Some("abc"), Some("efg"), 
Some("hij"),]).unwrap()
+    );
+
+    assert_eq!(
+        int_list_array
+            .value(1)
+            .as_any()
+            .downcast_ref::<PrimitiveArray<Int64Type>>()
+            .unwrap(),
+        &PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
+    );
+
+    assert!(utf8_list_array.is_null(1));
+
+    assert_eq!(
+        int_list_array
+            .value(2)
+            .as_any()
+            .downcast_ref::<PrimitiveArray<Int64Type>>()
+            .unwrap(),
+        &PrimitiveArray::<Int64Type>::from(vec![Some(4),])
+    );
+
+    let result = utf8_list_array.value(2);
+    let result = result.as_any().downcast_ref::<StringArray>().unwrap();
+
+    assert_eq!(result.value(0), "efg");
+    assert!(result.is_null(1));
+    assert_eq!(result.value(2), "hij");
+    assert_eq!(result.value(3), "xyz");
+}
+
+#[tokio::test]
 async fn csv_count_star() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
diff --git a/rust/parquet/src/arrow/array_reader.rs 
b/rust/parquet/src/arrow/array_reader.rs
index bee608c..ad2b84a 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -25,13 +25,35 @@ use std::sync::Arc;
 use std::vec::Vec;
 
 use arrow::array::{
-    ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, 
BufferBuilderTrait,
-    Int16BufferBuilder, StructArray,
+    Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray,
+    BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait, 
FixedSizeBinaryArray,
+    FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, ListBuilder,
+    OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StringArray, 
StringBuilder,
+    StructArray,
 };
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow::datatypes::{
-    DataType as ArrowType, DateUnit, Field, IntervalUnit, Schema, TimeUnit,
+    BooleanType as ArrowBooleanType, DataType as ArrowType,
+    Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit,
+    DurationMicrosecondType as ArrowDurationMicrosecondType,
+    DurationMillisecondType as ArrowDurationMillisecondType,
+    DurationNanosecondType as ArrowDurationNanosecondType,
+    DurationSecondType as ArrowDurationSecondType, Field,
+    Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
+    Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
+    Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, 
Schema,
+    Time32MillisecondType as ArrowTime32MillisecondType,
+    Time32SecondType as ArrowTime32SecondType,
+    Time64MicrosecondType as ArrowTime64MicrosecondType,
+    Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit,
+    TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as 
ArrowTimestampMicrosecondType,
+    TimestampMillisecondType as ArrowTimestampMillisecondType,
+    TimestampNanosecondType as ArrowTimestampNanosecondType,
+    TimestampSecondType as ArrowTimestampSecondType, ToByteSlice,
+    UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type,
+    UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type,
 };
+use arrow::util::bit_util;
 
 use crate::arrow::converter::{
     BinaryArrayConverter, BinaryConverter, BoolConverter, 
BooleanArrayConverter,
@@ -532,6 +554,400 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+    _marker: PhantomData<OffsetSize>,
+}
+
+impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+            _marker: PhantomData,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => 
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => 
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => 
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => 
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => 
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match 
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, 
$len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating 
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, 
ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, 
indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, 
ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, 
ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, 
ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, 
indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, 
indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, 
indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, 
indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, 
ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, 
indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, 
StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, 
BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => 
remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+/// Implementation of ListArrayReader. Nested lists and lists of structs are 
not yet supported.
+impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let next_batch_array = self.item_reader.next_batch(batch_size)?;
+        let item_type = self.item_reader.get_data_type().clone();
+
+        if next_batch_array.len() == 0 {
+            return build_empty_list_array(item_type);
+        }
+        let def_levels = self
+            .item_reader
+            .get_def_levels()
+            .ok_or_else(|| ArrowError("item_reader def levels are 
None.".to_string()))?;
+        let rep_levels = self
+            .item_reader
+            .get_rep_levels()
+            .ok_or_else(|| ArrowError("item_reader rep levels are 
None.".to_string()))?;
+
+        if !((def_levels.len() == rep_levels.len())
+            && (rep_levels.len() == next_batch_array.len()))
+        {
+            return Err(ArrowError(
+                "Expected item_reader def_levels and rep_levels to be same 
length as batch".to_string(),
+            ));
+        }
+
+        // Need to remove from the values array the nulls that represent null 
lists rather than null items
+        // null lists have def_level = 0
+        let mut null_list_indices: Vec<usize> = Vec::new();
+        for i in 0..def_levels.len() {
+            if def_levels[i] == 0 {
+                null_list_indices.push(i);
+            }
+        }
+        let batch_values = match null_list_indices.len() {
+            0 => next_batch_array.clone(),
+            _ => remove_indices(next_batch_array.clone(), item_type, 
null_list_indices)?,
+        };
+
+        // null list has def_level = 0
+        // empty list has def_level = 1
+        // null item in a list has def_level = 2
+        // non-null item has def_level = 3
+        // first item in each list has rep_level = 0, subsequent items have 
rep_level = 1
+
+        let mut offsets: Vec<OffsetSize> = Vec::new();
+        let mut cur_offset = OffsetSize::zero();
+        for i in 0..rep_levels.len() {
+            if rep_levels[i] == 0 {
+                offsets.push(cur_offset)
+            }
+            if def_levels[i] > 0 {
+                cur_offset = cur_offset + OffsetSize::one();
+            }
+        }
+        offsets.push(cur_offset);
+
+        let num_bytes = bit_util::ceil(offsets.len(), 8);
+        let mut null_buf = 
MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+        let null_slice = null_buf.data_mut();
+        let mut list_index = 0;
+        for i in 0..rep_levels.len() {
+            if rep_levels[i] == 0 && def_levels[i] != 0 {
+                bit_util::set_bit(null_slice, list_index);
+            }
+            if rep_levels[i] == 0 {
+                list_index += 1;
+            }
+        }
+        let value_offsets = Buffer::from(&offsets.to_byte_slice());
+
+        // null list has def_level = 0
+        let null_count = def_levels.iter().filter(|x| x == &&0).count();
+
+        let list_data = ArrayData::builder(self.get_data_type().clone())
+            .len(offsets.len() - 1)
+            .add_buffer(value_offsets)
+            .add_child_data(batch_values.data())
+            .null_bit_buffer(null_buf.freeze())
+            .null_count(null_count)
+            .offset(next_batch_array.offset())
+            .build();
+
+        let result_array = GenericListArray::<OffsetSize>::from(list_data);
+        Ok(Arc::new(result_array))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_level_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_level_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+}
+
 /// Implementation of struct array reader.
 pub struct StructArrayReader {
     children: Vec<Box<dyn ArrayReader>>,
@@ -875,16 +1291,94 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a 
ArrayReaderBuilderContext
     }
 
     /// Build array reader for list type.
-    /// Currently this is not supported.
     fn visit_list_with_item(
         &mut self,
-        _list_type: Rc<Type>,
-        _item_type: &Type,
-        _context: &'a ArrayReaderBuilderContext,
+        list_type: Rc<Type>,
+        item_type: Rc<Type>,
+        context: &'a ArrayReaderBuilderContext,
     ) -> Result<Option<Box<dyn ArrayReader>>> {
-        Err(ArrowError(
-            "Reading parquet list array into arrow is not supported 
yet!".to_string(),
-        ))
+        let list_child = &list_type
+            .get_fields()
+            .first()
+            .ok_or_else(|| ArrowError("List field must have a 
child.".to_string()))?;
+        let mut new_context = context.clone();
+
+        new_context.path.append(vec![list_type.name().to_string()]);
+
+        match list_type.get_basic_info().repetition() {
+            Repetition::REPEATED => {
+                new_context.def_level += 1;
+                new_context.rep_level += 1;
+            }
+            Repetition::OPTIONAL => {
+                new_context.def_level += 1;
+            }
+            _ => (),
+        }
+
+        match list_child.get_basic_info().repetition() {
+            Repetition::REPEATED => {
+                new_context.def_level += 1;
+                new_context.rep_level += 1;
+            }
+            Repetition::OPTIONAL => {
+                new_context.def_level += 1;
+            }
+            _ => (),
+        }
+
+        let item_reader = self
+            .dispatch(item_type.clone(), &new_context)
+            .unwrap()
+            .unwrap();
+
+        let item_reader_type = item_reader.get_data_type().clone();
+
+        match item_reader_type {
+            ArrowType::List(_)
+            | ArrowType::FixedSizeList(_, _)
+            | ArrowType::Struct(_)
+            | ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
+                "reading List({:?}) into arrow not supported yet",
+                item_type
+            ))),
+            _ => {
+                let arrow_type = self
+                    .arrow_schema
+                    .field_with_name(list_type.name())
+                    .ok()
+                    .map(|f| f.data_type().to_owned())
+                    .unwrap_or_else(|| {
+                        ArrowType::List(Box::new(item_reader_type.clone()))
+                    });
+
+                let list_array_reader: Box<dyn ArrayReader> = match arrow_type 
{
+                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
+                        item_reader,
+                        arrow_type,
+                        item_reader_type,
+                        new_context.def_level,
+                        new_context.rep_level,
+                    )),
+                    ArrowType::LargeList(_) => 
Box::new(ListArrayReader::<i64>::new(
+                        item_reader,
+                        arrow_type,
+                        item_reader_type,
+                        new_context.def_level,
+                        new_context.rep_level,
+                    )),
+
+                    _ => {
+                        return Err(ArrowError(format!(
+                        "creating ListArrayReader with type {:?} should be 
unreachable",
+                        arrow_type
+                    )))
+                    }
+                };
+
+                Ok(Some(list_array_reader))
+            }
+        }
     }
 }
 
@@ -1100,7 +1594,10 @@ mod tests {
         DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
     };
     use crate::util::test_common::{get_test_file, make_pages};
-    use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, 
StructArray};
+    use arrow::array::{
+        Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, 
StringArray,
+        StructArray,
+    };
     use arrow::datatypes::{
         ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, 
Field,
         Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1311,7 +1808,7 @@ mod tests {
                     .next_batch(50)
                     .expect("Unable to get batch from reader");
 
-                let result_data_type = <$result_arrow_type>::get_data_type();
+                let result_data_type = <$result_arrow_type>::DATA_TYPE;
                 let array = array
                     .as_any()
                     .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
@@ -1759,4 +2256,113 @@ mod tests {
 
         assert_eq!(array_reader.get_data_type(), &arrow_type);
     }
+
+    #[test]
+    fn test_list_array_reader() {
+        // [[1, null, 2], null, [3, 4]]
+        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            None,
+            Some(3),
+            Some(4),
+        ]));
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![3, 2, 3, 0, 3, 3]),
+            Some(vec![0, 1, 1, 0, 0, 1]),
+        );
+
+        let mut list_array_reader = ListArrayReader::<i32>::new(
+            Box::new(item_array_reader),
+            ArrowType::List(Box::new(ArrowType::Int32)),
+            ArrowType::Int32,
+            1,
+            1,
+        );
+
+        let next_batch = list_array_reader.next_batch(1024).unwrap();
+        let list_array = 
next_batch.as_any().downcast_ref::<ListArray>().unwrap();
+
+        assert_eq!(3, list_array.len());
+        // This passes as I expect
+        assert_eq!(1, list_array.null_count());
+
+        assert_eq!(
+            list_array
+                .value(0)
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap(),
+            &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+        );
+
+        assert!(list_array.is_null(1));
+
+        assert_eq!(
+            list_array
+                .value(2)
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap(),
+            &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+        );
+    }
+
+    #[test]
+    fn test_large_list_array_reader() {
+        // [[1, null, 2], null, [3, 4]]
+        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            None,
+            Some(3),
+            Some(4),
+        ]));
+        let item_array_reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![3, 2, 3, 0, 3, 3]),
+            Some(vec![0, 1, 1, 0, 0, 1]),
+        );
+
+        let mut list_array_reader = ListArrayReader::<i64>::new(
+            Box::new(item_array_reader),
+            ArrowType::LargeList(Box::new(ArrowType::Int32)),
+            ArrowType::Int32,
+            1,
+            1,
+        );
+
+        let next_batch = list_array_reader.next_batch(1024).unwrap();
+        let list_array = next_batch
+            .as_any()
+            .downcast_ref::<LargeListArray>()
+            .unwrap();
+
+        assert_eq!(3, list_array.len());
+
+        assert_eq!(
+            list_array
+                .value(0)
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap(),
+            &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+        );
+
+        assert!(list_array.is_null(1));
+
+        assert_eq!(
+            list_array
+                .value(2)
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap(),
+            &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+        );
+    }
 }
diff --git a/rust/parquet/src/arrow/arrow_writer.rs 
b/rust/parquet/src/arrow/arrow_writer.rs
index ff535dc..d5e2db4 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -22,7 +22,7 @@ use std::rc::Rc;
 use arrow::array as arrow_array;
 use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use arrow_array::{Array, PrimitiveArrayOps};
+use arrow_array::Array;
 
 use super::schema::add_encoded_arrow_schema_to_metadata;
 use crate::column::writer::ColumnWriter;
@@ -534,6 +534,7 @@ mod tests {
     }
 
     #[test]
+    #[ignore = "repetitions might be incorrect, will be addressed as part of 
ARROW-9728"]
     fn arrow_writer_list() {
         // define schema
         let schema = Schema::new(vec![Field::new(
@@ -546,7 +547,7 @@ mod tests {
         let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
 
         // Construct a buffer for value offsets, for the nested array:
-        //  [[false], [true, false], null, [true, false, true], [false, true, 
false, true]]
+        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
         let a_value_offsets =
             arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
 
@@ -562,6 +563,9 @@ mod tests {
         let batch =
             RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(a)]).unwrap();
 
+        // I think this setup is incorrect because this should pass
+        assert_eq!(batch.column(0).data().null_count(), 1);
+
         let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
         let mut writer = ArrowWriter::try_new(file, Arc::new(schema), 
None).unwrap();
         writer.write(&batch).unwrap();
@@ -1063,9 +1067,7 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(
-        expected = "Reading parquet list array into arrow is not supported 
yet!"
-    )]
+    #[ignore = "repetitions might be incorrect, will be addressed as part of 
ARROW-9728"]
     fn list_single_column() {
         let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
         let a_value_offsets =
@@ -1075,16 +1077,18 @@ mod tests {
             .add_buffer(a_value_offsets)
             .add_child_data(a_values.data())
             .build();
-        let a = ListArray::from(a_list_data);
 
+        // I think this setup is incorrect because this should pass
+        assert_eq!(a_list_data.null_count(), 1);
+
+        let a = ListArray::from(a_list_data);
         let values = Arc::new(a);
+
         one_column_roundtrip("list_single_column", values, false);
     }
 
     #[test]
-    #[should_panic(
-        expected = "Reading parquet list array into arrow is not supported 
yet!"
-    )]
+    #[ignore = "repetitions might be incorrect, will be addressed as part of 
ARROW-9728"]
     fn large_list_single_column() {
         let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
         let a_value_offsets =
@@ -1095,14 +1099,17 @@ mod tests {
                 .add_buffer(a_value_offsets)
                 .add_child_data(a_values.data())
                 .build();
-        let a = LargeListArray::from(a_list_data);
 
+        // I think this setup is incorrect because this should pass
+        assert_eq!(a_list_data.null_count(), 1);
+
+        let a = LargeListArray::from(a_list_data);
         let values = Arc::new(a);
+
         one_column_roundtrip("large_list_single_column", values, false);
     }
 
     #[test]
-    #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match
     fn struct_single_column() {
         let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
         let struct_field_a = Field::new("f", DataType::Int32, false);
diff --git a/rust/parquet/src/schema/visitor.rs 
b/rust/parquet/src/schema/visitor.rs
index 6d712ce..a1866fb 100644
--- a/rust/parquet/src/schema/visitor.rs
+++ b/rust/parquet/src/schema/visitor.rs
@@ -50,7 +50,7 @@ pub trait TypeVisitor<R, C> {
                         {
                             self.visit_list_with_item(
                                 list_type.clone(),
-                                list_item,
+                                list_item.clone(),
                                 context,
                             )
                         } else {
@@ -70,13 +70,13 @@ pub trait TypeVisitor<R, C> {
                         {
                             self.visit_list_with_item(
                                 list_type.clone(),
-                                fields.first().unwrap(),
+                                fields.first().unwrap().clone(),
                                 context,
                             )
                         } else {
                             self.visit_list_with_item(
                                 list_type.clone(),
-                                list_item,
+                                list_item.clone(),
                                 context,
                             )
                         }
@@ -114,7 +114,7 @@ pub trait TypeVisitor<R, C> {
     fn visit_list_with_item(
         &mut self,
         list_type: TypePtr,
-        item_type: &Type,
+        item_type: TypePtr,
         context: C,
     ) -> Result<R>;
 }
@@ -125,7 +125,7 @@ mod tests {
     use crate::basic::Type as PhysicalType;
     use crate::errors::Result;
     use crate::schema::parser::parse_message_type;
-    use crate::schema::types::{Type, TypePtr};
+    use crate::schema::types::TypePtr;
     use std::rc::Rc;
 
     struct TestVisitorContext {}
@@ -174,7 +174,7 @@ mod tests {
         fn visit_list_with_item(
             &mut self,
             list_type: TypePtr,
-            item_type: &Type,
+            item_type: TypePtr,
             _context: TestVisitorContext,
         ) -> Result<bool> {
             assert_eq!(

Reply via email to