This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a901927e4 Split up parquet::arrow::array_reader (#1483) (#1933)
a901927e4 is described below

commit a901927e46083b3915b57475df1d802d9806f68c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jun 23 21:51:30 2022 +0100

    Split up parquet::arrow::array_reader (#1483) (#1933)
    
    * Split up parquet::arrow::array_reader
    
    * RAT
---
 .../src/arrow/array_reader/complex_object_array.rs |  532 +++++++
 parquet/src/arrow/array_reader/mod.rs              | 1495 +-------------------
 parquet/src/arrow/array_reader/null_array.rs       |  106 ++
 parquet/src/arrow/array_reader/primitive_array.rs  |  613 ++++++++
 parquet/src/arrow/array_reader/struct_array.rs     |  294 ++++
 5 files changed, 1559 insertions(+), 1481 deletions(-)

diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs 
b/parquet/src/arrow/array_reader/complex_object_array.rs
new file mode 100644
index 000000000..b91fde5c4
--- /dev/null
+++ b/parquet/src/arrow/array_reader/complex_object_array.rs
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::arrow::buffer::converter::Converter;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::column::page::PageIterator;
+use crate::column::reader::ColumnReaderImpl;
+use crate::data_type::DataType;
+use crate::errors::Result;
+use crate::schema::types::ColumnDescPtr;
+use arrow::array::ArrayRef;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::marker::PhantomData;
+
+/// Primitive array readers are leaves of array reader tree. They accept page 
iterator
+/// and read them into primitive arrays.
+pub struct ComplexObjectArrayReader<T, C>
+where
+    T: DataType,
+    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
+{
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
+    column_desc: ColumnDescPtr,
+    column_reader: Option<ColumnReaderImpl<T>>,
+    converter: C,
+    _parquet_type_marker: PhantomData<T>,
+    _converter_marker: PhantomData<C>,
+}
+
+impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
+where
+    T: DataType,
+    C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
+{
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        // Try to initialize column reader
+        if self.column_reader.is_none() {
+            self.next_column_reader()?;
+        }
+
+        let mut data_buffer: Vec<T::T> = Vec::with_capacity(batch_size);
+        data_buffer.resize_with(batch_size, T::T::default);
+
+        let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 {
+            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
+            buf.resize_with(batch_size, || 0);
+            Some(buf)
+        } else {
+            None
+        };
+
+        let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 {
+            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
+            buf.resize_with(batch_size, || 0);
+            Some(buf)
+        } else {
+            None
+        };
+
+        let mut num_read = 0;
+
+        while self.column_reader.is_some() && num_read < batch_size {
+            let num_to_read = batch_size - num_read;
+            let cur_data_buf = &mut data_buffer[num_read..];
+            let cur_def_levels_buf =
+                def_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
+            let cur_rep_levels_buf =
+                rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
+            let (data_read, levels_read) =
+                self.column_reader.as_mut().unwrap().read_batch(
+                    num_to_read,
+                    cur_def_levels_buf,
+                    cur_rep_levels_buf,
+                    cur_data_buf,
+                )?;
+
+            // Fill space
+            if levels_read > data_read {
+                def_levels_buffer.iter().for_each(|def_levels_buffer| {
+                    let (mut level_pos, mut data_pos) = (levels_read, 
data_read);
+                    while level_pos > 0 && data_pos > 0 {
+                        if def_levels_buffer[num_read + level_pos - 1]
+                            == self.column_desc.max_def_level()
+                        {
+                            cur_data_buf.swap(level_pos - 1, data_pos - 1);
+                            level_pos -= 1;
+                            data_pos -= 1;
+                        } else {
+                            level_pos -= 1;
+                        }
+                    }
+                });
+            }
+
+            let values_read = levels_read.max(data_read);
+            num_read += values_read;
+            // current page exhausted && page iterator exhausted
+            if values_read < num_to_read && !self.next_column_reader()? {
+                break;
+            }
+        }
+
+        data_buffer.truncate(num_read);
+        def_levels_buffer
+            .iter_mut()
+            .for_each(|buf| buf.truncate(num_read));
+        rep_levels_buffer
+            .iter_mut()
+            .for_each(|buf| buf.truncate(num_read));
+
+        self.def_levels_buffer = def_levels_buffer;
+        self.rep_levels_buffer = rep_levels_buffer;
+
+        let data: Vec<Option<T::T>> = if self.def_levels_buffer.is_some() {
+            data_buffer
+                .into_iter()
+                .zip(self.def_levels_buffer.as_ref().unwrap().iter())
+                .map(|(t, def_level)| {
+                    if *def_level == self.column_desc.max_def_level() {
+                        Some(t)
+                    } else {
+                        None
+                    }
+                })
+                .collect()
+        } else {
+            data_buffer.into_iter().map(Some).collect()
+        };
+
+        let mut array = self.converter.convert(data)?;
+
+        if let ArrowType::Dictionary(_, _) = self.data_type {
+            array = arrow::compute::cast(&array, &self.data_type)?;
+        }
+
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer.as_deref()
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer.as_deref()
+    }
+}
+
+impl<T, C> ComplexObjectArrayReader<T, C>
+where
+    T: DataType,
+    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
+{
+    pub fn new(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        converter: C,
+        arrow_type: Option<ArrowType>,
+    ) -> Result<Self> {
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+
+        Ok(Self {
+            data_type,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            column_desc,
+            column_reader: None,
+            converter,
+            _parquet_type_marker: PhantomData,
+            _converter_marker: PhantomData,
+        })
+    }
+
+    fn next_column_reader(&mut self) -> Result<bool> {
+        Ok(match self.pages.next() {
+            Some(page) => {
+                self.column_reader =
+                    Some(ColumnReaderImpl::<T>::new(self.column_desc.clone(), 
page?));
+                true
+            }
+            None => false,
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
+    use crate::basic::Encoding;
+    use crate::column::page::Page;
+    use crate::data_type::{ByteArray, ByteArrayType};
+    use crate::schema::parser::parse_message_type;
+    use crate::schema::types::SchemaDescriptor;
+    use crate::util::{DataPageBuilder, DataPageBuilderImpl, 
InMemoryPageIterator};
+    use arrow::array::StringArray;
+    use rand::{thread_rng, Rng};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_complex_array_reader_no_pages() {
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL BYTE_ARRAY leaf (UTF8);
+            }
+        }
+        ";
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+        let column_desc = schema.column(0);
+        let pages: Vec<Vec<Page>> = Vec::new();
+        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
+
+        let converter = Utf8Converter::new(Utf8ArrayConverter {});
+        let mut array_reader =
+            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
+                Box::new(page_iterator),
+                column_desc,
+                converter,
+                None,
+            )
+            .unwrap();
+
+        let values_per_page = 100; // this value is arbitrary in this test - 
the result should always be an array of 0 length
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), 0);
+    }
+
+    #[test]
+    fn test_complex_array_reader_def_and_rep_levels() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL BYTE_ARRAY leaf (UTF8);
+            }
+        }
+        ";
+        let num_pages = 2;
+        let values_per_page = 100;
+        let str_base = "Hello World";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let max_def_level = schema.column(0).max_def_level();
+        let max_rep_level = schema.column(0).max_rep_level();
+
+        assert_eq!(max_def_level, 2);
+        assert_eq!(max_rep_level, 1);
+
+        let mut rng = thread_rng();
+        let column_desc = schema.column(0);
+        let mut pages: Vec<Vec<Page>> = Vec::new();
+
+        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
+
+        for i in 0..num_pages {
+            let mut values = Vec::with_capacity(values_per_page);
+
+            for _ in 0..values_per_page {
+                let def_level = rng.gen_range(0..max_def_level + 1);
+                let rep_level = rng.gen_range(0..max_rep_level + 1);
+                if def_level == max_def_level {
+                    let len = rng.gen_range(1..str_base.len());
+                    let slice = &str_base[..len];
+                    values.push(ByteArray::from(slice));
+                    all_values.push(Some(slice.to_string()));
+                } else {
+                    all_values.push(None)
+                }
+                rep_levels.push(rep_level);
+                def_levels.push(def_level)
+            }
+
+            let range = i * values_per_page..(i + 1) * values_per_page;
+            let mut pb =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
+
+            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
+            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
+            pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
+
+            let data_page = pb.consume();
+            pages.push(vec![data_page]);
+        }
+
+        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
+
+        let converter = Utf8Converter::new(Utf8ArrayConverter {});
+        let mut array_reader =
+            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
+                Box::new(page_iterator),
+                column_desc,
+                converter,
+                None,
+            )
+            .unwrap();
+
+        let mut accu_len: usize = 0;
+
+        let array = array_reader.next_batch(values_per_page / 2).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        accu_len += array.len();
+
+        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
+        // and the last values_per_page/2 ones are from the second column chunk
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        for i in 0..array.len() {
+            if array.is_valid(i) {
+                assert_eq!(
+                    all_values[i + accu_len].as_ref().unwrap().as_str(),
+                    strings.value(i)
+                )
+            } else {
+                assert_eq!(all_values[i + accu_len], None)
+            }
+        }
+        accu_len += array.len();
+
+        // Try to read values_per_page values, however there are only 
values_per_page/2 values
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+    }
+
+    #[test]
+    fn test_complex_array_reader_dict_enc_string() {
+        use crate::encodings::encoding::{DictEncoder, Encoder};
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL BYTE_ARRAY leaf (UTF8);
+            }
+        }
+        ";
+        let num_pages = 2;
+        let values_per_page = 100;
+        let str_base = "Hello World";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+        let column_desc = schema.column(0);
+        let max_def_level = column_desc.max_def_level();
+        let max_rep_level = column_desc.max_rep_level();
+
+        assert_eq!(max_def_level, 2);
+        assert_eq!(max_rep_level, 1);
+
+        let mut rng = thread_rng();
+        let mut pages: Vec<Vec<Page>> = Vec::new();
+
+        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
+
+        for i in 0..num_pages {
+            let mut dict_encoder = 
DictEncoder::<ByteArrayType>::new(column_desc.clone());
+            // add data page
+            let mut values = Vec::with_capacity(values_per_page);
+
+            for _ in 0..values_per_page {
+                let def_level = rng.gen_range(0..max_def_level + 1);
+                let rep_level = rng.gen_range(0..max_rep_level + 1);
+                if def_level == max_def_level {
+                    let len = rng.gen_range(1..str_base.len());
+                    let slice = &str_base[..len];
+                    values.push(ByteArray::from(slice));
+                    all_values.push(Some(slice.to_string()));
+                } else {
+                    all_values.push(None)
+                }
+                rep_levels.push(rep_level);
+                def_levels.push(def_level)
+            }
+
+            let range = i * values_per_page..(i + 1) * values_per_page;
+            let mut pb =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
+            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
+            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            pb.add_indices(indices);
+            let data_page = pb.consume();
+            // for each page log num_values vs actual values in page
+            // println!("page num_values: {}, values.len(): {}", 
data_page.num_values(), values.len());
+            // add dictionary page
+            let dict = dict_encoder
+                .write_dict()
+                .expect("write_dict() should be OK");
+            let dict_page = Page::DictionaryPage {
+                buf: dict,
+                num_values: dict_encoder.num_entries() as u32,
+                encoding: Encoding::RLE_DICTIONARY,
+                is_sorted: false,
+            };
+            pages.push(vec![dict_page, data_page]);
+        }
+
+        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
+        let converter = Utf8Converter::new(Utf8ArrayConverter {});
+        let mut array_reader =
+            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
+                Box::new(page_iterator),
+                column_desc,
+                converter,
+                None,
+            )
+            .unwrap();
+
+        let mut accu_len: usize = 0;
+
+        // println!("---------- reading a batch of {} values ----------", 
values_per_page / 2);
+        let array = array_reader.next_batch(values_per_page / 2).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        accu_len += array.len();
+
+        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
+        // and the last values_per_page/2 ones are from the second column chunk
+        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        for i in 0..array.len() {
+            if array.is_valid(i) {
+                assert_eq!(
+                    all_values[i + accu_len].as_ref().unwrap().as_str(),
+                    strings.value(i)
+                )
+            } else {
+                assert_eq!(all_values[i + accu_len], None)
+            }
+        }
+        accu_len += array.len();
+
+        // Try to read values_per_page values, however there are only 
values_per_page/2 values
+        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+    }
+}
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index 6207b377d..dd65a3626 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -17,44 +17,29 @@
 
 //! Logic for reading into arrow arrays
 
+use crate::errors::Result;
+use arrow::array::ArrayRef;
+use arrow::datatypes::DataType as ArrowType;
 use std::any::Any;
-use std::cmp::max;
-use std::marker::PhantomData;
-use std::result::Result::Ok;
 use std::sync::Arc;
-use std::vec::Vec;
 
-use arrow::array::{
-    Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, 
BooleanBufferBuilder,
-    DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
-};
-use arrow::buffer::Buffer;
-use arrow::datatypes::{
-    ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
-    Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
-    Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type,
-    UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
-};
-
-use crate::arrow::buffer::converter::Converter;
-use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
-use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
-use crate::arrow::schema::parquet_to_arrow_field;
-use crate::basic::Type as PhysicalType;
+use crate::arrow::record_reader::buffer::ValuesBuffer;
+use crate::arrow::record_reader::GenericRecordReader;
 use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
-use crate::column::reader::ColumnReaderImpl;
-use crate::data_type::DataType;
-use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
 use crate::file::reader::{FilePageIterator, FileReader};
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::schema::types::SchemaDescPtr;
 
 mod builder;
 mod byte_array;
 mod byte_array_dictionary;
+mod complex_object_array;
 mod empty_array;
 mod list_array;
 mod map_array;
+mod null_array;
+mod primitive_array;
+mod struct_array;
 
 #[cfg(test)]
 mod test_util;
@@ -62,8 +47,12 @@ mod test_util;
 pub use builder::build_array_reader;
 pub use byte_array::make_byte_array_reader;
 pub use byte_array_dictionary::make_byte_array_dictionary_reader;
+pub use complex_object_array::ComplexObjectArrayReader;
 pub use list_array::ListArrayReader;
 pub use map_array::MapArrayReader;
+pub use null_array::NullArrayReader;
+pub use primitive_array::PrimitiveArrayReader;
+pub use struct_array::StructArrayReader;
 
 /// Array reader reads parquet data into arrow array.
 pub trait ArrayReader: Send {
@@ -152,1459 +141,3 @@ where
     }
     Ok(records_read)
 }
-
-/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
-/// NullArray type.
-pub struct NullArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    data_type: ArrowType,
-    pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
-    column_desc: ColumnDescPtr,
-    record_reader: RecordReader<T>,
-    _type_marker: PhantomData<T>,
-}
-
-impl<T> NullArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    /// Construct null array reader.
-    pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> 
Result<Self> {
-        let record_reader = RecordReader::<T>::new(column_desc.clone());
-
-        Ok(Self {
-            data_type: ArrowType::Null,
-            pages,
-            def_levels_buffer: None,
-            rep_levels_buffer: None,
-            column_desc,
-            record_reader,
-            _type_marker: PhantomData,
-        })
-    }
-}
-
-/// Implementation of primitive array reader.
-impl<T> ArrayReader for NullArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    /// Returns data type of primitive array.
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    /// Reads at most `batch_size` records into array.
-    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
-        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
-
-        // convert to arrays
-        let array = 
arrow::array::NullArray::new(self.record_reader.num_values());
-
-        // save definition and repetition buffers
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
-
-        // Must consume bitmap buffer
-        self.record_reader.consume_bitmap_buffer()?;
-
-        self.record_reader.reset();
-        Ok(Arc::new(array))
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
-    }
-}
-
-/// Primitive array readers are leaves of array reader tree. They accept page 
iterator
-/// and read them into primitive arrays.
-pub struct PrimitiveArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    data_type: ArrowType,
-    pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
-    column_desc: ColumnDescPtr,
-    record_reader: RecordReader<T>,
-}
-
-impl<T> PrimitiveArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    /// Construct primitive array reader.
-    pub fn new(
-        pages: Box<dyn PageIterator>,
-        column_desc: ColumnDescPtr,
-        arrow_type: Option<ArrowType>,
-    ) -> Result<Self> {
-        Self::new_with_options(pages, column_desc, arrow_type, false)
-    }
-
-    /// Construct primitive array reader with ability to only compute null 
mask and not
-    /// buffer level data
-    pub fn new_with_options(
-        pages: Box<dyn PageIterator>,
-        column_desc: ColumnDescPtr,
-        arrow_type: Option<ArrowType>,
-        null_mask_only: bool,
-    ) -> Result<Self> {
-        // Check if Arrow type is specified, else create it from Parquet type
-        let data_type = match arrow_type {
-            Some(t) => t,
-            None => parquet_to_arrow_field(column_desc.as_ref())?
-                .data_type()
-                .clone(),
-        };
-
-        let record_reader =
-            RecordReader::<T>::new_with_options(column_desc.clone(), 
null_mask_only);
-
-        Ok(Self {
-            data_type,
-            pages,
-            def_levels_buffer: None,
-            rep_levels_buffer: None,
-            column_desc,
-            record_reader,
-        })
-    }
-}
-
-/// Implementation of primitive array reader.
-impl<T> ArrayReader for PrimitiveArrayReader<T>
-where
-    T: DataType,
-    T::T: ScalarValue,
-{
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    /// Returns data type of primitive array.
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    /// Reads at most `batch_size` records into array.
-    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
-        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
-
-        let target_type = self.get_data_type().clone();
-        let arrow_data_type = match T::get_physical_type() {
-            PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
-            PhysicalType::INT32 => {
-                match target_type {
-                    ArrowType::UInt32 => {
-                        // follow C++ implementation and use 
overflow/reinterpret cast from  i32 to u32 which will map
-                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
-                        ArrowUInt32Type::DATA_TYPE
-                    }
-                    _ => ArrowInt32Type::DATA_TYPE,
-                }
-            }
-            PhysicalType::INT64 => {
-                match target_type {
-                    ArrowType::UInt64 => {
-                        // follow C++ implementation and use 
overflow/reinterpret cast from  i64 to u64 which will map
-                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
-                        ArrowUInt64Type::DATA_TYPE
-                    }
-                    _ => ArrowInt64Type::DATA_TYPE,
-                }
-            }
-            PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE,
-            PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE,
-            PhysicalType::INT96
-            | PhysicalType::BYTE_ARRAY
-            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                unreachable!(
-                    "PrimitiveArrayReaders don't support complex physical 
types"
-                );
-            }
-        };
-
-        // Convert to arrays by using the Parquet physical type.
-        // The physical types are then cast to Arrow types if necessary
-
-        let mut record_data = self.record_reader.consume_record_data()?;
-
-        if T::get_physical_type() == PhysicalType::BOOLEAN {
-            let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
-
-            for e in record_data.as_slice() {
-                boolean_buffer.append(*e > 0);
-            }
-            record_data = boolean_buffer.finish();
-        }
-
-        let array_data = ArrayDataBuilder::new(arrow_data_type)
-            .len(self.record_reader.num_values())
-            .add_buffer(record_data)
-            .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
-
-        let array_data = unsafe { array_data.build_unchecked() };
-        let array = match T::get_physical_type() {
-            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)) 
as ArrayRef,
-            PhysicalType::INT32 => {
-                Arc::new(PrimitiveArray::<ArrowInt32Type>::from(array_data)) 
as ArrayRef
-            }
-            PhysicalType::INT64 => {
-                Arc::new(PrimitiveArray::<ArrowInt64Type>::from(array_data)) 
as ArrayRef
-            }
-            PhysicalType::FLOAT => {
-                Arc::new(PrimitiveArray::<ArrowFloat32Type>::from(array_data)) 
as ArrayRef
-            }
-            PhysicalType::DOUBLE => {
-                Arc::new(PrimitiveArray::<ArrowFloat64Type>::from(array_data)) 
as ArrayRef
-            }
-            PhysicalType::INT96
-            | PhysicalType::BYTE_ARRAY
-            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                unreachable!(
-                    "PrimitiveArrayReaders don't support complex physical 
types"
-                );
-            }
-        };
-
-        // cast to Arrow type
-        // We make a strong assumption here that the casts should be 
infallible.
-        // If the cast fails because of incompatible datatypes, then there 
might
-        // be a bigger problem with how Arrow schemas are converted to Parquet.
-        //
-        // As there is not always a 1:1 mapping between Arrow and Parquet, 
there
-        // are datatypes which we must convert explicitly.
-        // These are:
-        // - date64: we should cast int32 to date32, then date32 to date64.
-        let array = match target_type {
-            ArrowType::Date64 => {
-                // this is cheap as it internally reinterprets the data
-                let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
-                arrow::compute::cast(&a, &target_type)?
-            }
-            ArrowType::Decimal(p, s) => {
-                let array = match array.data_type() {
-                    ArrowType::Int32 => array
-                        .as_any()
-                        .downcast_ref::<Int32Array>()
-                        .unwrap()
-                        .iter()
-                        .map(|v| v.map(|v| v.into()))
-                        .collect::<DecimalArray>(),
-
-                    ArrowType::Int64 => array
-                        .as_any()
-                        .downcast_ref::<Int64Array>()
-                        .unwrap()
-                        .iter()
-                        .map(|v| v.map(|v| v.into()))
-                        .collect::<DecimalArray>(),
-                    _ => {
-                        return Err(ArrowError(format!(
-                            "Cannot convert {:?} to decimal",
-                            array.data_type()
-                        )))
-                    }
-                }
-                .with_precision_and_scale(p, s)?;
-
-                Arc::new(array) as ArrayRef
-            }
-            _ => arrow::compute::cast(&array, &target_type)?,
-        };
-
-        // save definition and repetition buffers
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
-        self.record_reader.reset();
-        Ok(array)
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
-    }
-}
-
-/// Primitive array readers are leaves of array reader tree. They accept page 
iterator
-/// and read them into primitive arrays.
-pub struct ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
-    data_type: ArrowType,
-    pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Vec<i16>>,
-    rep_levels_buffer: Option<Vec<i16>>,
-    column_desc: ColumnDescPtr,
-    column_reader: Option<ColumnReaderImpl<T>>,
-    converter: C,
-    _parquet_type_marker: PhantomData<T>,
-    _converter_marker: PhantomData<C>,
-}
-
-impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
-{
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
-        // Try to initialize column reader
-        if self.column_reader.is_none() {
-            self.next_column_reader()?;
-        }
-
-        let mut data_buffer: Vec<T::T> = Vec::with_capacity(batch_size);
-        data_buffer.resize_with(batch_size, T::T::default);
-
-        let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 {
-            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
-            buf.resize_with(batch_size, || 0);
-            Some(buf)
-        } else {
-            None
-        };
-
-        let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 {
-            let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
-            buf.resize_with(batch_size, || 0);
-            Some(buf)
-        } else {
-            None
-        };
-
-        let mut num_read = 0;
-
-        while self.column_reader.is_some() && num_read < batch_size {
-            let num_to_read = batch_size - num_read;
-            let cur_data_buf = &mut data_buffer[num_read..];
-            let cur_def_levels_buf =
-                def_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
-            let cur_rep_levels_buf =
-                rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
-            let (data_read, levels_read) =
-                self.column_reader.as_mut().unwrap().read_batch(
-                    num_to_read,
-                    cur_def_levels_buf,
-                    cur_rep_levels_buf,
-                    cur_data_buf,
-                )?;
-
-            // Fill space
-            if levels_read > data_read {
-                def_levels_buffer.iter().for_each(|def_levels_buffer| {
-                    let (mut level_pos, mut data_pos) = (levels_read, 
data_read);
-                    while level_pos > 0 && data_pos > 0 {
-                        if def_levels_buffer[num_read + level_pos - 1]
-                            == self.column_desc.max_def_level()
-                        {
-                            cur_data_buf.swap(level_pos - 1, data_pos - 1);
-                            level_pos -= 1;
-                            data_pos -= 1;
-                        } else {
-                            level_pos -= 1;
-                        }
-                    }
-                });
-            }
-
-            let values_read = max(levels_read, data_read);
-            num_read += values_read;
-            // current page exhausted && page iterator exhausted
-            if values_read < num_to_read && !self.next_column_reader()? {
-                break;
-            }
-        }
-
-        data_buffer.truncate(num_read);
-        def_levels_buffer
-            .iter_mut()
-            .for_each(|buf| buf.truncate(num_read));
-        rep_levels_buffer
-            .iter_mut()
-            .for_each(|buf| buf.truncate(num_read));
-
-        self.def_levels_buffer = def_levels_buffer;
-        self.rep_levels_buffer = rep_levels_buffer;
-
-        let data: Vec<Option<T::T>> = if self.def_levels_buffer.is_some() {
-            data_buffer
-                .into_iter()
-                .zip(self.def_levels_buffer.as_ref().unwrap().iter())
-                .map(|(t, def_level)| {
-                    if *def_level == self.column_desc.max_def_level() {
-                        Some(t)
-                    } else {
-                        None
-                    }
-                })
-                .collect()
-        } else {
-            data_buffer.into_iter().map(Some).collect()
-        };
-
-        let mut array = self.converter.convert(data)?;
-
-        if let ArrowType::Dictionary(_, _) = self.data_type {
-            array = arrow::compute::cast(&array, &self.data_type)?;
-        }
-
-        Ok(array)
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_deref()
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_deref()
-    }
-}
-
-impl<T, C> ComplexObjectArrayReader<T, C>
-where
-    T: DataType,
-    C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
-{
-    pub fn new(
-        pages: Box<dyn PageIterator>,
-        column_desc: ColumnDescPtr,
-        converter: C,
-        arrow_type: Option<ArrowType>,
-    ) -> Result<Self> {
-        let data_type = match arrow_type {
-            Some(t) => t,
-            None => parquet_to_arrow_field(column_desc.as_ref())?
-                .data_type()
-                .clone(),
-        };
-
-        Ok(Self {
-            data_type,
-            pages,
-            def_levels_buffer: None,
-            rep_levels_buffer: None,
-            column_desc,
-            column_reader: None,
-            converter,
-            _parquet_type_marker: PhantomData,
-            _converter_marker: PhantomData,
-        })
-    }
-
-    fn next_column_reader(&mut self) -> Result<bool> {
-        Ok(match self.pages.next() {
-            Some(page) => {
-                self.column_reader =
-                    Some(ColumnReaderImpl::<T>::new(self.column_desc.clone(), 
page?));
-                true
-            }
-            None => false,
-        })
-    }
-}
-
-/// Implementation of struct array reader.
-pub struct StructArrayReader {
-    children: Vec<Box<dyn ArrayReader>>,
-    data_type: ArrowType,
-    struct_def_level: i16,
-    struct_rep_level: i16,
-    nullable: bool,
-}
-
-impl StructArrayReader {
-    /// Construct struct array reader.
-    pub fn new(
-        data_type: ArrowType,
-        children: Vec<Box<dyn ArrayReader>>,
-        def_level: i16,
-        rep_level: i16,
-        nullable: bool,
-    ) -> Self {
-        Self {
-            data_type,
-            children,
-            struct_def_level: def_level,
-            struct_rep_level: rep_level,
-            nullable,
-        }
-    }
-}
-
-impl ArrayReader for StructArrayReader {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    /// Returns data type.
-    /// This must be a struct.
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    /// Read `batch_size` struct records.
-    ///
-    /// Definition levels of struct array is calculated as following:
-    /// ```ignore
-    /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ...,
-    /// childn_def_levels[i]);
-    /// ```
-    ///
-    /// Repetition levels of struct array is calculated as following:
-    /// ```ignore
-    /// rep_levels[i] = child1_rep_levels[i];
-    /// ```
-    ///
-    /// The null bitmap of struct array is calculated from def_levels:
-    /// ```ignore
-    /// null_bitmap[i] = (def_levels[i] >= self.def_level);
-    /// ```
-    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
-        if self.children.is_empty() {
-            return Ok(Arc::new(StructArray::from(Vec::new())));
-        }
-
-        let children_array = self
-            .children
-            .iter_mut()
-            .map(|reader| reader.next_batch(batch_size))
-            .collect::<Result<Vec<_>>>()?;
-
-        // check that array child data has same size
-        let children_array_len =
-            children_array.first().map(|arr| arr.len()).ok_or_else(|| {
-                general_err!("Struct array reader should have at least one 
child!")
-            })?;
-
-        let all_children_len_eq = children_array
-            .iter()
-            .all(|arr| arr.len() == children_array_len);
-        if !all_children_len_eq {
-            return Err(general_err!("Not all children array length are the 
same!"));
-        }
-
-        // Now we can build array data
-        let mut array_data_builder = 
ArrayDataBuilder::new(self.data_type.clone())
-            .len(children_array_len)
-            .child_data(
-                children_array
-                    .iter()
-                    .map(|x| x.data().clone())
-                    .collect::<Vec<ArrayData>>(),
-            );
-
-        if self.nullable {
-            // calculate struct def level data
-
-            // children should have consistent view of parent, only need to 
inspect first child
-            let def_levels = self.children[0]
-                .get_def_levels()
-                .expect("child with nullable parents must have definition 
level");
-
-            // calculate bitmap for current array
-            let mut bitmap_builder = 
BooleanBufferBuilder::new(children_array_len);
-
-            match self.children[0].get_rep_levels() {
-                Some(rep_levels) => {
-                    // Sanity check
-                    assert_eq!(rep_levels.len(), def_levels.len());
-
-                    for (rep_level, def_level) in 
rep_levels.iter().zip(def_levels) {
-                        if rep_level > &self.struct_rep_level {
-                            // Already handled by inner list - SKIP
-                            continue;
-                        }
-                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
-                    }
-                }
-                None => {
-                    for def_level in def_levels {
-                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
-                    }
-                }
-            }
-
-            if bitmap_builder.len() != children_array_len {
-                return Err(general_err!("Failed to decode level data for 
struct array"));
-            }
-
-            array_data_builder =
-                
array_data_builder.null_bit_buffer(Some(bitmap_builder.finish()));
-        }
-
-        let array_data = unsafe { array_data_builder.build_unchecked() };
-        Ok(Arc::new(StructArray::from(array_data)))
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        // Children definition levels should describe the same
-        // parent structure, so return first child's
-        self.children.first().and_then(|l| l.get_def_levels())
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        // Children definition levels should describe the same
-        // parent structure, so return first child's
-        self.children.first().and_then(|l| l.get_rep_levels())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::collections::VecDeque;
-    use std::sync::Arc;
-
-    use rand::distributions::uniform::SampleUniform;
-    use rand::{thread_rng, Rng};
-
-    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
-    use arrow::array::{
-        Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
-    };
-    use arrow::datatypes::{
-        ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, 
Field,
-        Int32Type as ArrowInt32, Int64Type as ArrowInt64,
-        Time32MillisecondType as ArrowTime32MillisecondArray,
-        Time64MicrosecondType as ArrowTime64MicrosecondArray,
-        TimestampMicrosecondType as ArrowTimestampMicrosecondType,
-        TimestampMillisecondType as ArrowTimestampMillisecondType,
-    };
-
-    use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
-    use crate::basic::{Encoding, Type as PhysicalType};
-    use crate::column::page::Page;
-    use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, 
Int64Type};
-    use crate::schema::parser::parse_message_type;
-    use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-    use crate::util::test_common::make_pages;
-    use crate::util::test_common::page_util::{
-        DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
-    };
-
-    use super::*;
-
-    fn make_column_chunks<T: DataType>(
-        column_desc: ColumnDescPtr,
-        encoding: Encoding,
-        num_levels: usize,
-        min_value: T::T,
-        max_value: T::T,
-        def_levels: &mut Vec<i16>,
-        rep_levels: &mut Vec<i16>,
-        values: &mut Vec<T::T>,
-        page_lists: &mut Vec<Vec<Page>>,
-        use_v2: bool,
-        num_chunks: usize,
-    ) where
-        T::T: PartialOrd + SampleUniform + Copy,
-    {
-        for _i in 0..num_chunks {
-            let mut pages = VecDeque::new();
-            let mut data = Vec::new();
-            let mut page_def_levels = Vec::new();
-            let mut page_rep_levels = Vec::new();
-
-            make_pages::<T>(
-                column_desc.clone(),
-                encoding,
-                1,
-                num_levels,
-                min_value,
-                max_value,
-                &mut page_def_levels,
-                &mut page_rep_levels,
-                &mut data,
-                &mut pages,
-                use_v2,
-            );
-
-            def_levels.append(&mut page_def_levels);
-            rep_levels.append(&mut page_rep_levels);
-            values.append(&mut data);
-            page_lists.push(Vec::from(pages));
-        }
-    }
-
-    #[test]
-    fn test_primitive_array_reader_empty_pages() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-          REQUIRED INT32 leaf;
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-        let page_iterator = test_util::EmptyPageIterator::new(schema);
-
-        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-            Box::new(page_iterator),
-            column_desc,
-            None,
-        )
-        .unwrap();
-
-        // expect no values to be read
-        let array = array_reader.next_batch(50).unwrap();
-        assert!(array.is_empty());
-    }
-
-    #[test]
-    fn test_primitive_array_reader_data() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-          REQUIRED INT32 leaf;
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-
-        // Construct page iterator
-        {
-            let mut data = Vec::new();
-            let mut page_lists = Vec::new();
-            make_column_chunks::<Int32Type>(
-                column_desc.clone(),
-                Encoding::PLAIN,
-                100,
-                1,
-                200,
-                &mut Vec::new(),
-                &mut Vec::new(),
-                &mut data,
-                &mut page_lists,
-                true,
-                2,
-            );
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
-
-            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
-
-            // Read first 50 values, which are all from the first column chunk
-            let array = array_reader.next_batch(50).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
-                array
-            );
-
-            // Read next 100 values, the first 50 ones are from the first 
column chunk,
-            // and the last 50 ones are from the second column chunk
-            let array = array_reader.next_batch(100).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
-                array
-            );
-
-            // Try to read 100 values, however there are only 50 values
-            let array = array_reader.next_batch(100).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
-                array
-            );
-        }
-    }
-
-    macro_rules! test_primitive_array_reader_one_type {
-        ($arrow_parquet_type:ty, $physical_type:expr, 
$converted_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, 
$result_primitive_type:ty) => {{
-            let message_type = format!(
-                "
-            message test_schema {{
-              REQUIRED {:?} leaf ({});
-          }}
-            ",
-                $physical_type, $converted_type_str
-            );
-            let schema = parse_message_type(&message_type)
-                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-                .unwrap();
-
-            let column_desc = schema.column(0);
-
-            // Construct page iterator
-            {
-                let mut data = Vec::new();
-                let mut page_lists = Vec::new();
-                make_column_chunks::<$arrow_parquet_type>(
-                    column_desc.clone(),
-                    Encoding::PLAIN,
-                    100,
-                    1,
-                    200,
-                    &mut Vec::new(),
-                    &mut Vec::new(),
-                    &mut data,
-                    &mut page_lists,
-                    true,
-                    2,
-                );
-                let page_iterator = InMemoryPageIterator::new(
-                    schema.clone(),
-                    column_desc.clone(),
-                    page_lists,
-                );
-                let mut array_reader = 
PrimitiveArrayReader::<$arrow_parquet_type>::new(
-                    Box::new(page_iterator),
-                    column_desc.clone(),
-                    None,
-                )
-                .expect("Unable to get array reader");
-
-                let array = array_reader
-                    .next_batch(50)
-                    .expect("Unable to get batch from reader");
-
-                let result_data_type = <$result_arrow_type>::DATA_TYPE;
-                let array = array
-                    .as_any()
-                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
-                    .expect(
-                        format!(
-                            "Unable to downcast {:?} to {:?}",
-                            array.data_type(),
-                            result_data_type
-                        )
-                        .as_str(),
-                    );
-
-                // create expected array as primitive, and cast to result type
-                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
-                    data[0..50]
-                        .iter()
-                        .map(|x| *x as $result_primitive_type)
-                        .collect::<Vec<$result_primitive_type>>(),
-                );
-                let expected = Arc::new(expected) as ArrayRef;
-                let expected = arrow::compute::cast(&expected, 
&result_data_type)
-                    .expect("Unable to cast expected array");
-                assert_eq!(expected.data_type(), &result_data_type);
-                let expected = expected
-                    .as_any()
-                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
-                    .expect(
-                        format!(
-                            "Unable to downcast expected {:?} to {:?}",
-                            expected.data_type(),
-                            result_data_type
-                        )
-                        .as_str(),
-                    );
-                assert_eq!(expected, array);
-            }
-        }};
-    }
-
-    #[test]
-    fn test_primitive_array_reader_temporal_types() {
-        test_primitive_array_reader_one_type!(
-            Int32Type,
-            PhysicalType::INT32,
-            "DATE",
-            ArrowDate32,
-            ArrowInt32,
-            i32
-        );
-        test_primitive_array_reader_one_type!(
-            Int32Type,
-            PhysicalType::INT32,
-            "TIME_MILLIS",
-            ArrowTime32MillisecondArray,
-            ArrowInt32,
-            i32
-        );
-        test_primitive_array_reader_one_type!(
-            Int64Type,
-            PhysicalType::INT64,
-            "TIME_MICROS",
-            ArrowTime64MicrosecondArray,
-            ArrowInt64,
-            i64
-        );
-        test_primitive_array_reader_one_type!(
-            Int64Type,
-            PhysicalType::INT64,
-            "TIMESTAMP_MILLIS",
-            ArrowTimestampMillisecondType,
-            ArrowInt64,
-            i64
-        );
-        test_primitive_array_reader_one_type!(
-            Int64Type,
-            PhysicalType::INT64,
-            "TIMESTAMP_MICROS",
-            ArrowTimestampMicrosecondType,
-            ArrowInt64,
-            i64
-        );
-    }
-
-    #[test]
-    fn test_primitive_array_reader_def_and_rep_levels() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL INT32 leaf;
-            }
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-
-        // Construct page iterator
-        {
-            let mut def_levels = Vec::new();
-            let mut rep_levels = Vec::new();
-            let mut page_lists = Vec::new();
-            make_column_chunks::<Int32Type>(
-                column_desc.clone(),
-                Encoding::PLAIN,
-                100,
-                1,
-                200,
-                &mut def_levels,
-                &mut rep_levels,
-                &mut Vec::new(),
-                &mut page_lists,
-                true,
-                2,
-            );
-
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
-
-            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
-
-            let mut accu_len: usize = 0;
-
-            // Read first 50 values, which are all from the first column chunk
-            let array = array_reader.next_batch(50).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-            accu_len += array.len();
-
-            // Read next 100 values, the first 50 ones are from the first 
column chunk,
-            // and the last 50 ones are from the second column chunk
-            let array = array_reader.next_batch(100).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-            accu_len += array.len();
-
-            // Try to read 100 values, however there are only 50 values
-            let array = array_reader.next_batch(100).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-        }
-    }
-
-    #[test]
-    fn test_complex_array_reader_no_pages() {
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let pages: Vec<Vec<Page>> = Vec::new();
-        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
-
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let values_per_page = 100; // this value is arbitrary in this test - 
the result should always be an array of 0 length
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), 0);
-    }
-
-    #[test]
-    fn test_complex_array_reader_def_and_rep_levels() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let max_def_level = schema.column(0).max_def_level();
-        let max_rep_level = schema.column(0).max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let column_desc = schema.column(0);
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
-
-            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
-
-            let data_page = pb.consume();
-            pages.push(vec![data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
-
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        let array = array_reader.next_batch(values_per_page / 2).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += array.len();
-
-        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += array.len();
-
-        // Try to read values_per_page values, however there are only 
values_per_page/2 values
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-    }
-
-    #[test]
-    fn test_complex_array_reader_dict_enc_string() {
-        use crate::encodings::encoding::{DictEncoder, Encoder};
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let max_def_level = column_desc.max_def_level();
-        let max_rep_level = column_desc.max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mut dict_encoder = 
DictEncoder::<ByteArrayType>::new(column_desc.clone());
-            // add data page
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
-            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            let _ = dict_encoder.put(&values);
-            let indices = dict_encoder
-                .write_indices()
-                .expect("write_indices() should be OK");
-            pb.add_indices(indices);
-            let data_page = pb.consume();
-            // for each page log num_values vs actual values in page
-            // println!("page num_values: {}, values.len(): {}", 
data_page.num_values(), values.len());
-            // add dictionary page
-            let dict = dict_encoder
-                .write_dict()
-                .expect("write_dict() should be OK");
-            let dict_page = Page::DictionaryPage {
-                buf: dict,
-                num_values: dict_encoder.num_entries() as u32,
-                encoding: Encoding::RLE_DICTIONARY,
-                is_sorted: false,
-            };
-            pages.push(vec![dict_page, data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
-        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-        let mut array_reader =
-            ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-                Box::new(page_iterator),
-                column_desc,
-                converter,
-                None,
-            )
-            .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page / 2);
-        let array = array_reader.next_batch(values_per_page / 2).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += array.len();
-
-        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += array.len();
-
-        // Try to read values_per_page values, however there are only 
values_per_page/2 values
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-    }
-
-    #[test]
-    fn test_struct_array_reader() {
-        let array_1 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![1, 2, 
3, 4, 5]));
-        let array_reader_1 = InMemoryArrayReader::new(
-            ArrowType::Int32,
-            array_1.clone(),
-            Some(vec![0, 1, 2, 3, 1]),
-            Some(vec![0, 1, 1, 1, 1]),
-        );
-
-        let array_2 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![5, 4, 
3, 2, 1]));
-        let array_reader_2 = InMemoryArrayReader::new(
-            ArrowType::Int32,
-            array_2.clone(),
-            Some(vec![0, 1, 3, 1, 2]),
-            Some(vec![0, 1, 1, 1, 1]),
-        );
-
-        let struct_type = ArrowType::Struct(vec![
-            Field::new("f1", array_1.data_type().clone(), true),
-            Field::new("f2", array_2.data_type().clone(), true),
-        ]);
-
-        let mut struct_array_reader = StructArrayReader::new(
-            struct_type,
-            vec![Box::new(array_reader_1), Box::new(array_reader_2)],
-            1,
-            1,
-            true,
-        );
-
-        let struct_array = struct_array_reader.next_batch(5).unwrap();
-        let struct_array = 
struct_array.as_any().downcast_ref::<StructArray>().unwrap();
-
-        assert_eq!(5, struct_array.len());
-        assert_eq!(
-            vec![true, false, false, false, false],
-            (0..5)
-                .map(|idx| struct_array.data_ref().is_null(idx))
-                .collect::<Vec<bool>>()
-        );
-        assert_eq!(
-            Some(vec![0, 1, 2, 3, 1].as_slice()),
-            struct_array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(vec![0, 1, 1, 1, 1].as_slice()),
-            struct_array_reader.get_rep_levels()
-        );
-    }
-
-    #[test]
-    fn test_struct_array_reader_list() {
-        use arrow::datatypes::Int32Type;
-        // [
-        //    {foo: [1, 2, null],
-        //    {foo: []},
-        //    {foo: null},
-        //    null,
-        // ]
-
-        let expected_l =
-            Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
-                Some(vec![Some(1), Some(2), None]),
-                Some(vec![]),
-                None,
-                None,
-            ]));
-
-        let validity = Buffer::from([0b00000111]);
-        let struct_fields = vec![(
-            Field::new("foo", expected_l.data_type().clone(), true),
-            expected_l.clone() as ArrayRef,
-        )];
-        let expected = StructArray::from((struct_fields, validity));
-
-        let array = Arc::new(Int32Array::from_iter(vec![
-            Some(1),
-            Some(2),
-            None,
-            None,
-            None,
-            None,
-        ]));
-        let reader = InMemoryArrayReader::new(
-            ArrowType::Int32,
-            array,
-            Some(vec![4, 4, 3, 2, 1, 0]),
-            Some(vec![0, 1, 1, 0, 0, 0]),
-        );
-
-        let list_reader = ListArrayReader::<i32>::new(
-            Box::new(reader),
-            expected_l.data_type().clone(),
-            ArrowType::Int32,
-            3,
-            1,
-            true,
-        );
-
-        let mut struct_reader = StructArrayReader::new(
-            expected.data_type().clone(),
-            vec![Box::new(list_reader)],
-            1,
-            0,
-            true,
-        );
-
-        let actual = struct_reader.next_batch(1024).unwrap();
-        let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
-        assert_eq!(actual, &expected)
-    }
-}
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
new file mode 100644
index 000000000..53ac0852f
--- /dev/null
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::ScalarValue;
+use crate::arrow::record_reader::RecordReader;
+use crate::column::page::PageIterator;
+use crate::data_type::DataType;
+use crate::errors::Result;
+use crate::schema::types::ColumnDescPtr;
+use arrow::array::ArrayRef;
+use arrow::buffer::Buffer;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::sync::Arc;
+
+/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
+/// NullArray type.
+pub struct NullArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    column_desc: ColumnDescPtr,
+    record_reader: RecordReader<T>,
+}
+
+impl<T> NullArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    /// Construct null array reader.
+    pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> 
Result<Self> {
+        let record_reader = RecordReader::<T>::new(column_desc.clone());
+
+        Ok(Self {
+            data_type: ArrowType::Null,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            column_desc,
+            record_reader,
+        })
+    }
+}
+
+/// Implementation of primitive array reader.
+impl<T> ArrayReader for NullArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type of primitive array.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    /// Reads at most `batch_size` records into array.
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
+
+        // convert to arrays
+        let array = 
arrow::array::NullArray::new(self.record_reader.num_values());
+
+        // save definition and repetition buffers
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+
+        // Must consume bitmap buffer
+        self.record_reader.consume_bitmap_buffer()?;
+
+        self.record_reader.reset();
+        Ok(Arc::new(array))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+}
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
new file mode 100644
index 000000000..222b595c2
--- /dev/null
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -0,0 +1,613 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::ScalarValue;
+use crate::arrow::record_reader::RecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::Type as PhysicalType;
+use crate::column::page::PageIterator;
+use crate::data_type::DataType;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use arrow::array::{
+    ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, 
DecimalArray,
+    Float32Array, Float64Array, Int32Array, Int64Array,
+};
+use arrow::buffer::Buffer;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Primitive array readers are leaves of array reader tree. They accept page 
iterator
+/// and read them into primitive arrays.
+pub struct PrimitiveArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    column_desc: ColumnDescPtr,
+    record_reader: RecordReader<T>,
+}
+
+impl<T> PrimitiveArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    /// Construct primitive array reader.
+    pub fn new(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        arrow_type: Option<ArrowType>,
+    ) -> Result<Self> {
+        Self::new_with_options(pages, column_desc, arrow_type, false)
+    }
+
+    /// Construct primitive array reader with ability to only compute null 
mask and not
+    /// buffer level data
+    pub fn new_with_options(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        arrow_type: Option<ArrowType>,
+        null_mask_only: bool,
+    ) -> Result<Self> {
+        // Check if Arrow type is specified, else create it from Parquet type
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+
+        let record_reader =
+            RecordReader::<T>::new_with_options(column_desc.clone(), 
null_mask_only);
+
+        Ok(Self {
+            data_type,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            column_desc,
+            record_reader,
+        })
+    }
+}
+
+/// Implementation of primitive array reader.
+impl<T> ArrayReader for PrimitiveArrayReader<T>
+where
+    T: DataType,
+    T::T: ScalarValue,
+{
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type of primitive array.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    /// Reads at most `batch_size` records into array.
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
+
+        let target_type = self.get_data_type().clone();
+        let arrow_data_type = match T::get_physical_type() {
+            PhysicalType::BOOLEAN => ArrowType::Boolean,
+            PhysicalType::INT32 => {
+                match target_type {
+                    ArrowType::UInt32 => {
+                        // follow C++ implementation and use 
overflow/reinterpret cast from  i32 to u32 which will map
+                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
+                        ArrowType::UInt32
+                    }
+                    _ => ArrowType::Int32,
+                }
+            }
+            PhysicalType::INT64 => {
+                match target_type {
+                    ArrowType::UInt64 => {
+                        // follow C++ implementation and use 
overflow/reinterpret cast from  i64 to u64 which will map
+                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
+                        ArrowType::UInt64
+                    }
+                    _ => ArrowType::Int64,
+                }
+            }
+            PhysicalType::FLOAT => ArrowType::Float32,
+            PhysicalType::DOUBLE => ArrowType::Float64,
+            PhysicalType::INT96
+            | PhysicalType::BYTE_ARRAY
+            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                unreachable!(
+                    "PrimitiveArrayReaders don't support complex physical 
types"
+                );
+            }
+        };
+
+        // Convert to arrays by using the Parquet physical type.
+        // The physical types are then cast to Arrow types if necessary
+
+        let mut record_data = self.record_reader.consume_record_data()?;
+
+        if T::get_physical_type() == PhysicalType::BOOLEAN {
+            let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
+
+            for e in record_data.as_slice() {
+                boolean_buffer.append(*e > 0);
+            }
+            record_data = boolean_buffer.finish();
+        }
+
+        let array_data = ArrayDataBuilder::new(arrow_data_type)
+            .len(self.record_reader.num_values())
+            .add_buffer(record_data)
+            .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
+
+        let array_data = unsafe { array_data.build_unchecked() };
+        let array = match T::get_physical_type() {
+            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)) 
as ArrayRef,
+            PhysicalType::INT32 => Arc::new(Int32Array::from(array_data)) as 
ArrayRef,
+            PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as 
ArrayRef,
+            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as 
ArrayRef,
+            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)) 
as ArrayRef,
+            PhysicalType::INT96
+            | PhysicalType::BYTE_ARRAY
+            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+                unreachable!(
+                    "PrimitiveArrayReaders don't support complex physical 
types"
+                );
+            }
+        };
+
+        // cast to Arrow type
+        // We make a strong assumption here that the casts should be 
infallible.
+        // If the cast fails because of incompatible datatypes, then there 
might
+        // be a bigger problem with how Arrow schemas are converted to Parquet.
+        //
+        // As there is not always a 1:1 mapping between Arrow and Parquet, 
there
+        // are datatypes which we must convert explicitly.
+        // These are:
+        // - date64: we should cast int32 to date32, then date32 to date64.
+        let array = match target_type {
+            ArrowType::Date64 => {
+                // this is cheap as it internally reinterprets the data
+                let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
+                arrow::compute::cast(&a, &target_type)?
+            }
+            ArrowType::Decimal(p, s) => {
+                let array = match array.data_type() {
+                    ArrowType::Int32 => array
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| v.into()))
+                        .collect::<DecimalArray>(),
+
+                    ArrowType::Int64 => array
+                        .as_any()
+                        .downcast_ref::<Int64Array>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| v.into()))
+                        .collect::<DecimalArray>(),
+                    _ => {
+                        return Err(arrow_err!(
+                            "Cannot convert {:?} to decimal",
+                            array.data_type()
+                        ))
+                    }
+                }
+                .with_precision_and_scale(p, s)?;
+
+                Arc::new(array) as ArrayRef
+            }
+            _ => arrow::compute::cast(&array, &target_type)?,
+        };
+
+        // save definition and repetition buffers
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.record_reader.reset();
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array_reader::test_util::EmptyPageIterator;
+    use crate::basic::Encoding;
+    use crate::column::page::Page;
+    use crate::data_type::Int32Type;
+    use crate::schema::parser::parse_message_type;
+    use crate::schema::types::SchemaDescriptor;
+    use crate::util::test_common::make_pages;
+    use crate::util::InMemoryPageIterator;
+    use arrow::array::PrimitiveArray;
+    use arrow::datatypes::ArrowPrimitiveType;
+
+    use rand::distributions::uniform::SampleUniform;
+    use std::collections::VecDeque;
+
+    fn make_column_chunks<T: DataType>(
+        column_desc: ColumnDescPtr,
+        encoding: Encoding,
+        num_levels: usize,
+        min_value: T::T,
+        max_value: T::T,
+        def_levels: &mut Vec<i16>,
+        rep_levels: &mut Vec<i16>,
+        values: &mut Vec<T::T>,
+        page_lists: &mut Vec<Vec<Page>>,
+        use_v2: bool,
+        num_chunks: usize,
+    ) where
+        T::T: PartialOrd + SampleUniform + Copy,
+    {
+        for _i in 0..num_chunks {
+            let mut pages = VecDeque::new();
+            let mut data = Vec::new();
+            let mut page_def_levels = Vec::new();
+            let mut page_rep_levels = Vec::new();
+
+            make_pages::<T>(
+                column_desc.clone(),
+                encoding,
+                1,
+                num_levels,
+                min_value,
+                max_value,
+                &mut page_def_levels,
+                &mut page_rep_levels,
+                &mut data,
+                &mut pages,
+                use_v2,
+            );
+
+            def_levels.append(&mut page_def_levels);
+            rep_levels.append(&mut page_rep_levels);
+            values.append(&mut data);
+            page_lists.push(Vec::from(pages));
+        }
+    }
+
+    #[test]
+    fn test_primitive_array_reader_empty_pages() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+        let page_iterator = EmptyPageIterator::new(schema);
+
+        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+            Box::new(page_iterator),
+            column_desc,
+            None,
+        )
+        .unwrap();
+
+        // expect no values to be read
+        let array = array_reader.next_batch(50).unwrap();
+        assert!(array.is_empty());
+    }
+
+    #[test]
+    fn test_primitive_array_reader_data() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+
+        // Construct page iterator
+        {
+            let mut data = Vec::new();
+            let mut page_lists = Vec::new();
+            make_column_chunks::<Int32Type>(
+                column_desc.clone(),
+                Encoding::PLAIN,
+                100,
+                1,
+                200,
+                &mut Vec::new(),
+                &mut Vec::new(),
+                &mut data,
+                &mut page_lists,
+                true,
+                2,
+            );
+            let page_iterator =
+                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+            )
+            .unwrap();
+
+            // Read first 50 values, which are all from the first column chunk
+            let array = array_reader.next_batch(50).unwrap();
+            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+
+            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
+
+            // Read next 100 values, the first 50 ones are from the first 
column chunk,
+            // and the last 50 ones are from the second column chunk
+            let array = array_reader.next_batch(100).unwrap();
+            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+
+            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
+
+            // Try to read 100 values, however there are only 50 values
+            let array = array_reader.next_batch(100).unwrap();
+            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+
+            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
+        }
+    }
+
+    macro_rules! test_primitive_array_reader_one_type {
+        ($arrow_parquet_type:ty, $physical_type:expr, 
$converted_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, 
$result_primitive_type:ty) => {{
+            let message_type = format!(
+                "
+            message test_schema {{
+              REQUIRED {:?} leaf ({});
+          }}
+            ",
+                $physical_type, $converted_type_str
+            );
+            let schema = parse_message_type(&message_type)
+                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+                .unwrap();
+
+            let column_desc = schema.column(0);
+
+            // Construct page iterator
+            {
+                let mut data = Vec::new();
+                let mut page_lists = Vec::new();
+                make_column_chunks::<$arrow_parquet_type>(
+                    column_desc.clone(),
+                    Encoding::PLAIN,
+                    100,
+                    1,
+                    200,
+                    &mut Vec::new(),
+                    &mut Vec::new(),
+                    &mut data,
+                    &mut page_lists,
+                    true,
+                    2,
+                );
+                let page_iterator = InMemoryPageIterator::new(
+                    schema.clone(),
+                    column_desc.clone(),
+                    page_lists,
+                );
+                let mut array_reader = 
PrimitiveArrayReader::<$arrow_parquet_type>::new(
+                    Box::new(page_iterator),
+                    column_desc.clone(),
+                    None,
+                )
+                .expect("Unable to get array reader");
+
+                let array = array_reader
+                    .next_batch(50)
+                    .expect("Unable to get batch from reader");
+
+                let result_data_type = <$result_arrow_type>::DATA_TYPE;
+                let array = array
+                    .as_any()
+                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
+                    .expect(
+                        format!(
+                            "Unable to downcast {:?} to {:?}",
+                            array.data_type(),
+                            result_data_type
+                        )
+                        .as_str(),
+                    );
+
+                // create expected array as primitive, and cast to result type
+                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
+                    data[0..50]
+                        .iter()
+                        .map(|x| *x as $result_primitive_type)
+                        .collect::<Vec<$result_primitive_type>>(),
+                );
+                let expected = Arc::new(expected) as ArrayRef;
+                let expected = arrow::compute::cast(&expected, 
&result_data_type)
+                    .expect("Unable to cast expected array");
+                assert_eq!(expected.data_type(), &result_data_type);
+                let expected = expected
+                    .as_any()
+                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
+                    .expect(
+                        format!(
+                            "Unable to downcast expected {:?} to {:?}",
+                            expected.data_type(),
+                            result_data_type
+                        )
+                        .as_str(),
+                    );
+                assert_eq!(expected, array);
+            }
+        }};
+    }
+
+    #[test]
+    fn test_primitive_array_reader_temporal_types() {
+        test_primitive_array_reader_one_type!(
+            crate::data_type::Int32Type,
+            PhysicalType::INT32,
+            "DATE",
+            arrow::datatypes::Date32Type,
+            arrow::datatypes::Int32Type,
+            i32
+        );
+        test_primitive_array_reader_one_type!(
+            crate::data_type::Int32Type,
+            PhysicalType::INT32,
+            "TIME_MILLIS",
+            arrow::datatypes::Time32MillisecondType,
+            arrow::datatypes::Int32Type,
+            i32
+        );
+        test_primitive_array_reader_one_type!(
+            crate::data_type::Int64Type,
+            PhysicalType::INT64,
+            "TIME_MICROS",
+            arrow::datatypes::Time64MicrosecondType,
+            arrow::datatypes::Int64Type,
+            i64
+        );
+        test_primitive_array_reader_one_type!(
+            crate::data_type::Int64Type,
+            PhysicalType::INT64,
+            "TIMESTAMP_MILLIS",
+            arrow::datatypes::TimestampMillisecondType,
+            arrow::datatypes::Int64Type,
+            i64
+        );
+        test_primitive_array_reader_one_type!(
+            crate::data_type::Int64Type,
+            PhysicalType::INT64,
+            "TIMESTAMP_MICROS",
+            arrow::datatypes::TimestampMicrosecondType,
+            arrow::datatypes::Int64Type,
+            i64
+        );
+    }
+
+    #[test]
+    fn test_primitive_array_reader_def_and_rep_levels() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL INT32 leaf;
+            }
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+
+        // Construct page iterator
+        {
+            let mut def_levels = Vec::new();
+            let mut rep_levels = Vec::new();
+            let mut page_lists = Vec::new();
+            make_column_chunks::<Int32Type>(
+                column_desc.clone(),
+                Encoding::PLAIN,
+                100,
+                1,
+                200,
+                &mut def_levels,
+                &mut rep_levels,
+                &mut Vec::new(),
+                &mut page_lists,
+                true,
+                2,
+            );
+
+            let page_iterator =
+                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+            )
+            .unwrap();
+
+            let mut accu_len: usize = 0;
+
+            // Read first 50 values, which are all from the first column chunk
+            let array = array_reader.next_batch(50).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+            accu_len += array.len();
+
+            // Read next 100 values, the first 50 ones are from the first 
column chunk,
+            // and the last 50 ones are from the second column chunk
+            let array = array_reader.next_batch(100).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+            accu_len += array.len();
+
+            // Try to read 100 values, however there are only 50 values
+            let array = array_reader.next_batch(100).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+        }
+    }
+}
diff --git a/parquet/src/arrow/array_reader/struct_array.rs 
b/parquet/src/arrow/array_reader/struct_array.rs
new file mode 100644
index 000000000..30824d742
--- /dev/null
+++ b/parquet/src/arrow/array_reader/struct_array.rs
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{
+    ArrayData, ArrayDataBuilder, ArrayRef, BooleanBufferBuilder, StructArray,
+};
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Implementation of struct array reader.
+pub struct StructArrayReader {
+    children: Vec<Box<dyn ArrayReader>>,
+    data_type: ArrowType,
+    struct_def_level: i16,
+    struct_rep_level: i16,
+    nullable: bool,
+}
+
+impl StructArrayReader {
+    /// Construct struct array reader.
+    pub fn new(
+        data_type: ArrowType,
+        children: Vec<Box<dyn ArrayReader>>,
+        def_level: i16,
+        rep_level: i16,
+        nullable: bool,
+    ) -> Self {
+        Self {
+            data_type,
+            children,
+            struct_def_level: def_level,
+            struct_rep_level: rep_level,
+            nullable,
+        }
+    }
+}
+
+impl ArrayReader for StructArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a struct.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    /// Read `batch_size` struct records.
+    ///
+    /// Definition levels of struct array is calculated as following:
+    /// ```ignore
+    /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ...,
+    /// childn_def_levels[i]);
+    /// ```
+    ///
+    /// Repetition levels of struct array is calculated as following:
+    /// ```ignore
+    /// rep_levels[i] = child1_rep_levels[i];
+    /// ```
+    ///
+    /// The null bitmap of struct array is calculated from def_levels:
+    /// ```ignore
+    /// null_bitmap[i] = (def_levels[i] >= self.def_level);
+    /// ```
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        if self.children.is_empty() {
+            return Ok(Arc::new(StructArray::from(Vec::new())));
+        }
+
+        let children_array = self
+            .children
+            .iter_mut()
+            .map(|reader| reader.next_batch(batch_size))
+            .collect::<Result<Vec<_>>>()?;
+
+        // check that array child data has same size
+        let children_array_len =
+            children_array.first().map(|arr| arr.len()).ok_or_else(|| {
+                general_err!("Struct array reader should have at least one 
child!")
+            })?;
+
+        let all_children_len_eq = children_array
+            .iter()
+            .all(|arr| arr.len() == children_array_len);
+        if !all_children_len_eq {
+            return Err(general_err!("Not all children array length are the 
same!"));
+        }
+
+        // Now we can build array data
+        let mut array_data_builder = 
ArrayDataBuilder::new(self.data_type.clone())
+            .len(children_array_len)
+            .child_data(
+                children_array
+                    .iter()
+                    .map(|x| x.data().clone())
+                    .collect::<Vec<ArrayData>>(),
+            );
+
+        if self.nullable {
+            // calculate struct def level data
+
+            // children should have consistent view of parent, only need to 
inspect first child
+            let def_levels = self.children[0]
+                .get_def_levels()
+                .expect("child with nullable parents must have definition 
level");
+
+            // calculate bitmap for current array
+            let mut bitmap_builder = 
BooleanBufferBuilder::new(children_array_len);
+
+            match self.children[0].get_rep_levels() {
+                Some(rep_levels) => {
+                    // Sanity check
+                    assert_eq!(rep_levels.len(), def_levels.len());
+
+                    for (rep_level, def_level) in 
rep_levels.iter().zip(def_levels) {
+                        if rep_level > &self.struct_rep_level {
+                            // Already handled by inner list - SKIP
+                            continue;
+                        }
+                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
+                    }
+                }
+                None => {
+                    for def_level in def_levels {
+                        bitmap_builder.append(*def_level >= 
self.struct_def_level)
+                    }
+                }
+            }
+
+            if bitmap_builder.len() != children_array_len {
+                return Err(general_err!("Failed to decode level data for 
struct array"));
+            }
+
+            array_data_builder =
+                
array_data_builder.null_bit_buffer(Some(bitmap_builder.finish()));
+        }
+
+        let array_data = unsafe { array_data_builder.build_unchecked() };
+        Ok(Arc::new(StructArray::from(array_data)))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        // Children definition levels should describe the same
+        // parent structure, so return first child's
+        self.children.first().and_then(|l| l.get_def_levels())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        // Children definition levels should describe the same
+        // parent structure, so return first child's
+        self.children.first().and_then(|l| l.get_rep_levels())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+    use crate::arrow::array_reader::ListArrayReader;
+    use arrow::array::{Array, Int32Array, ListArray};
+    use arrow::buffer::Buffer;
+    use arrow::datatypes::Field;
+
+    #[test]
+    fn test_struct_array_reader() {
+        let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let array_reader_1 = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array_1.clone(),
+            Some(vec![0, 1, 2, 3, 1]),
+            Some(vec![0, 1, 1, 1, 1]),
+        );
+
+        let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]));
+        let array_reader_2 = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array_2.clone(),
+            Some(vec![0, 1, 3, 1, 2]),
+            Some(vec![0, 1, 1, 1, 1]),
+        );
+
+        let struct_type = ArrowType::Struct(vec![
+            Field::new("f1", array_1.data_type().clone(), true),
+            Field::new("f2", array_2.data_type().clone(), true),
+        ]);
+
+        let mut struct_array_reader = StructArrayReader::new(
+            struct_type,
+            vec![Box::new(array_reader_1), Box::new(array_reader_2)],
+            1,
+            1,
+            true,
+        );
+
+        let struct_array = struct_array_reader.next_batch(5).unwrap();
+        let struct_array = 
struct_array.as_any().downcast_ref::<StructArray>().unwrap();
+
+        assert_eq!(5, struct_array.len());
+        assert_eq!(
+            vec![true, false, false, false, false],
+            (0..5)
+                .map(|idx| struct_array.data_ref().is_null(idx))
+                .collect::<Vec<bool>>()
+        );
+        assert_eq!(
+            Some(vec![0, 1, 2, 3, 1].as_slice()),
+            struct_array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(vec![0, 1, 1, 1, 1].as_slice()),
+            struct_array_reader.get_rep_levels()
+        );
+    }
+
+    #[test]
+    fn test_struct_array_reader_list() {
+        use arrow::datatypes::Int32Type;
+        // [
+        //    {foo: [1, 2, null],
+        //    {foo: []},
+        //    {foo: null},
+        //    null,
+        // ]
+
+        let expected_l =
+            Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+                Some(vec![Some(1), Some(2), None]),
+                Some(vec![]),
+                None,
+                None,
+            ]));
+
+        let validity = Buffer::from([0b00000111]);
+        let struct_fields = vec![(
+            Field::new("foo", expected_l.data_type().clone(), true),
+            expected_l.clone() as ArrayRef,
+        )];
+        let expected = StructArray::from((struct_fields, validity));
+
+        let array = Arc::new(Int32Array::from_iter(vec![
+            Some(1),
+            Some(2),
+            None,
+            None,
+            None,
+            None,
+        ]));
+        let reader = InMemoryArrayReader::new(
+            ArrowType::Int32,
+            array,
+            Some(vec![4, 4, 3, 2, 1, 0]),
+            Some(vec![0, 1, 1, 0, 0, 0]),
+        );
+
+        let list_reader = ListArrayReader::<i32>::new(
+            Box::new(reader),
+            expected_l.data_type().clone(),
+            ArrowType::Int32,
+            3,
+            1,
+            true,
+        );
+
+        let mut struct_reader = StructArrayReader::new(
+            expected.data_type().clone(),
+            vec![Box::new(list_reader)],
+            1,
+            0,
+            true,
+        );
+
+        let actual = struct_reader.next_batch(1024).unwrap();
+        let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
+        assert_eq!(actual, &expected)
+    }
+}

Reply via email to