This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed1992  Remove arrow array reader (#1197) (#1234)
6ed1992 is described below

commit 6ed1992101ad3c99094818ab1a12b4a28bb043eb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jan 24 15:07:26 2022 +0000

    Remove arrow array reader (#1197) (#1234)
---
 parquet/Cargo.toml                                 |    2 +-
 .../{arrow_array_reader.rs => arrow_reader.rs}     |  277 +--
 parquet/src/arrow/arrow_array_reader.rs            | 1922 --------------------
 parquet/src/arrow/mod.rs                           |    1 -
 4 files changed, 40 insertions(+), 2162 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index f0ba360..0765da1 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -81,6 +81,6 @@ name = "arrow_writer"
 harness = false
 
 [[bench]]
-name = "arrow_array_reader"
+name = "arrow_reader"
 required-features = ["test_common", "experimental"]
 harness = false
diff --git a/parquet/benches/arrow_array_reader.rs 
b/parquet/benches/arrow_reader.rs
similarity index 74%
rename from parquet/benches/arrow_array_reader.rs
rename to parquet/benches/arrow_reader.rs
index 9db5b4c..1bd6fc6 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -288,17 +288,6 @@ fn bench_array_reader(mut array_reader: Box<dyn 
ArrayReader>) -> usize {
     total_count
 }
 
-fn create_int32_arrow_array_reader(
-    page_iterator: impl PageIterator + 'static,
-    column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
-    use parquet::arrow::arrow_array_reader::{ArrowArrayReader, 
PrimitiveArrayConverter};
-    let converter = 
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
-    let reader =
-        ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap();
-    Box::new(reader)
-}
-
 fn create_int32_primitive_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
@@ -314,17 +303,6 @@ fn create_int32_primitive_array_reader(
     Box::new(reader)
 }
 
-fn create_string_arrow_array_reader(
-    page_iterator: impl PageIterator + 'static,
-    column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
-    use parquet::arrow::arrow_array_reader::{ArrowArrayReader, 
StringArrayConverter};
-    let converter = StringArrayConverter::new();
-    let reader =
-        ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap();
-    Box::new(reader)
-}
-
 fn create_string_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
@@ -394,66 +372,32 @@ fn add_benches(c: &mut Criterion) {
         mandatory_int32_column_desc.clone(),
         0.0,
     );
-    group.bench_function(
-        "read Int32Array, plain encoded, mandatory, no NULLs - old",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    plain_int32_no_null_data.clone(),
-                    mandatory_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
-    group.bench_function(
-        "read Int32Array, plain encoded, mandatory, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    plain_int32_no_null_data.clone(),
-                    mandatory_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
+    group.bench_function("read Int32Array, plain encoded, mandatory, no 
NULLs", |b| {
+        b.iter(|| {
+            let array_reader = create_int32_primitive_array_reader(
+                plain_int32_no_null_data.clone(),
+                mandatory_int32_column_desc.clone(),
+            );
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
     let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
         schema.clone(),
         optional_int32_column_desc.clone(),
         0.0,
     );
-    group.bench_function(
-        "read Int32Array, plain encoded, optional, no NULLs - old",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    plain_int32_no_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
-    group.bench_function(
-        "read Int32Array, plain encoded, optional, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    plain_int32_no_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
+    group.bench_function("read Int32Array, plain encoded, optional, no NULLs", 
|b| {
+        b.iter(|| {
+            let array_reader = create_int32_primitive_array_reader(
+                plain_int32_no_null_data.clone(),
+                optional_int32_column_desc.clone(),
+            );
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
     // int32, plain encoded, half NULLs
     let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
@@ -462,7 +406,7 @@ fn add_benches(c: &mut Criterion) {
         0.5,
     );
     group.bench_function(
-        "read Int32Array, plain encoded, optional, half NULLs - old",
+        "read Int32Array, plain encoded, optional, half NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_int32_primitive_array_reader(
@@ -475,20 +419,6 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read Int32Array, plain encoded, optional, half NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    plain_int32_half_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     // int32, dictionary encoded, no NULLs
     let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
         schema.clone(),
@@ -496,7 +426,7 @@ fn add_benches(c: &mut Criterion) {
         0.0,
     );
     group.bench_function(
-        "read Int32Array, dictionary encoded, mandatory, no NULLs - old",
+        "read Int32Array, dictionary encoded, mandatory, no NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_int32_primitive_array_reader(
@@ -509,27 +439,13 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read Int32Array, dictionary encoded, mandatory, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    dictionary_int32_no_null_data.clone(),
-                    mandatory_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
         schema.clone(),
         optional_int32_column_desc.clone(),
         0.0,
     );
     group.bench_function(
-        "read Int32Array, dictionary encoded, optional, no NULLs - old",
+        "read Int32Array, dictionary encoded, optional, no NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_int32_primitive_array_reader(
@@ -542,20 +458,6 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read Int32Array, dictionary encoded, optional, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    dictionary_int32_no_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     // int32, dictionary encoded, half NULLs
     let dictionary_int32_half_null_data = 
build_dictionary_encoded_int32_page_iterator(
         schema.clone(),
@@ -563,7 +465,7 @@ fn add_benches(c: &mut Criterion) {
         0.5,
     );
     group.bench_function(
-        "read Int32Array, dictionary encoded, optional, half NULLs - old",
+        "read Int32Array, dictionary encoded, optional, half NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_int32_primitive_array_reader(
@@ -576,20 +478,6 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read Int32Array, dictionary encoded, optional, half NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_arrow_array_reader(
-                    dictionary_int32_half_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     // string benchmarks
     //==============================
 
@@ -600,7 +488,7 @@ fn add_benches(c: &mut Criterion) {
         0.0,
     );
     group.bench_function(
-        "read StringArray, plain encoded, mandatory, no NULLs - old",
+        "read StringArray, plain encoded, mandatory, no NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_string_byte_array_reader(
@@ -613,52 +501,21 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read StringArray, plain encoded, mandatory, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    plain_string_no_null_data.clone(),
-                    mandatory_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
         schema.clone(),
         optional_string_column_desc.clone(),
         0.0,
     );
-    group.bench_function(
-        "read StringArray, plain encoded, optional, no NULLs - old",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_byte_array_reader(
-                    plain_string_no_null_data.clone(),
-                    optional_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
-    group.bench_function(
-        "read StringArray, plain encoded, optional, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    plain_string_no_null_data.clone(),
-                    optional_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
+    group.bench_function("read StringArray, plain encoded, optional, no 
NULLs", |b| {
+        b.iter(|| {
+            let array_reader = create_string_byte_array_reader(
+                plain_string_no_null_data.clone(),
+                optional_string_column_desc.clone(),
+            );
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
     // string, plain encoded, half NULLs
     let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
@@ -667,7 +524,7 @@ fn add_benches(c: &mut Criterion) {
         0.5,
     );
     group.bench_function(
-        "read StringArray, plain encoded, optional, half NULLs - old",
+        "read StringArray, plain encoded, optional, half NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_string_byte_array_reader(
@@ -680,20 +537,6 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read StringArray, plain encoded, optional, half NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    plain_string_half_null_data.clone(),
-                    optional_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     // string, dictionary encoded, no NULLs
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
         schema.clone(),
@@ -701,7 +544,7 @@ fn add_benches(c: &mut Criterion) {
         0.0,
     );
     group.bench_function(
-        "read StringArray, dictionary encoded, mandatory, no NULLs - old",
+        "read StringArray, dictionary encoded, mandatory, no NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_string_byte_array_reader(
@@ -714,27 +557,13 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read StringArray, dictionary encoded, mandatory, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    dictionary_string_no_null_data.clone(),
-                    mandatory_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
         schema.clone(),
         optional_string_column_desc.clone(),
         0.0,
     );
     group.bench_function(
-        "read StringArray, dictionary encoded, optional, no NULLs - old",
+        "read StringArray, dictionary encoded, optional, no NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_string_byte_array_reader(
@@ -747,20 +576,6 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
-    group.bench_function(
-        "read StringArray, dictionary encoded, optional, no NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    dictionary_string_no_null_data.clone(),
-                    optional_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
     // string, dictionary encoded, half NULLs
     let dictionary_string_half_null_data = 
build_dictionary_encoded_string_page_iterator(
         schema.clone(),
@@ -768,7 +583,7 @@ fn add_benches(c: &mut Criterion) {
         0.5,
     );
     group.bench_function(
-        "read StringArray, dictionary encoded, optional, half NULLs - old",
+        "read StringArray, dictionary encoded, optional, half NULLs",
         |b| {
             b.iter(|| {
                 let array_reader = create_string_byte_array_reader(
@@ -782,20 +597,6 @@ fn add_benches(c: &mut Criterion) {
     );
 
     group.bench_function(
-        "read StringArray, dictionary encoded, optional, half NULLs - new",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_arrow_array_reader(
-                    dictionary_string_half_null_data.clone(),
-                    optional_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
-
-    group.bench_function(
         "read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
         |b| {
             b.iter(|| {
diff --git a/parquet/src/arrow/arrow_array_reader.rs 
b/parquet/src/arrow/arrow_array_reader.rs
deleted file mode 100644
index d814f7f..0000000
--- a/parquet/src/arrow/arrow_array_reader.rs
+++ /dev/null
@@ -1,1922 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::array_reader::ArrayReader;
-use crate::arrow::schema::parquet_to_arrow_field;
-use crate::basic::Encoding;
-use crate::data_type::{ByteArray, ByteArrayType};
-use crate::encodings::decoding::{Decoder, DeltaByteArrayDecoder};
-use crate::errors::{ParquetError, Result};
-use crate::{
-    column::page::{Page, PageIterator},
-    schema::types::{ColumnDescPtr, ColumnDescriptor},
-    util::memory::ByteBufferPtr,
-};
-use arrow::{
-    array::{ArrayRef, Int16Array},
-    buffer::MutableBuffer,
-    datatypes::{DataType as ArrowType, ToByteSlice},
-};
-use std::{any::Any, collections::VecDeque, marker::PhantomData};
-use std::{cell::RefCell, rc::Rc};
-
-struct UnzipIter<Source, Target, State> {
-    shared_state: Rc<RefCell<State>>,
-    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
-    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
-}
-
-impl<Source, Target, State> UnzipIter<Source, Target, State> {
-    fn new(
-        shared_state: Rc<RefCell<State>>,
-        item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>,
-        source_item_consumer: fn(source_item: Source, state: &mut State) -> 
Target,
-    ) -> Self {
-        Self {
-            shared_state,
-            select_item_buffer: item_buffer_selector,
-            consume_source_item: source_item_consumer,
-        }
-    }
-}
-
-trait UnzipIterState<T> {
-    type SourceIter: Iterator<Item = T>;
-    fn source_iter(&mut self) -> &mut Self::SourceIter;
-}
-
-impl<Source, Target, State: UnzipIterState<Source>> Iterator
-    for UnzipIter<Source, Target, State>
-{
-    type Item = Target;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let mut inner = self.shared_state.borrow_mut();
-        // try to get one from the stored data
-        (self.select_item_buffer)(&mut *inner)
-            .pop_front()
-            .or_else(||
-            // nothing stored, we need a new element.
-            inner.source_iter().next().map(|s| {
-                (self.consume_source_item)(s, &mut inner)
-            }))
-    }
-}
-
-struct PageBufferUnzipIterState<V, L, It> {
-    iter: It,
-    value_iter_buffer: VecDeque<V>,
-    def_level_iter_buffer: VecDeque<L>,
-    rep_level_iter_buffer: VecDeque<L>,
-}
-
-impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)>
-    for PageBufferUnzipIterState<V, L, It>
-{
-    type SourceIter = It;
-
-    #[inline]
-    fn source_iter(&mut self) -> &mut Self::SourceIter {
-        &mut self.iter
-    }
-}
-
-type ValueUnzipIter<V, L, It> =
-    UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>;
-type LevelUnzipIter<V, L, It> =
-    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>;
-type PageUnzipResult<V, L, It> = (
-    ValueUnzipIter<V, L, It>,
-    LevelUnzipIter<V, L, It>,
-    LevelUnzipIter<V, L, It>,
-);
-
-fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> 
PageUnzipResult<V, L, It> {
-    let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState {
-        iter: it,
-        value_iter_buffer: VecDeque::new(),
-        def_level_iter_buffer: VecDeque::new(),
-        rep_level_iter_buffer: VecDeque::new(),
-    }));
-
-    let value_iter = UnzipIter::new(
-        shared_data.clone(),
-        |state| &mut state.value_iter_buffer,
-        |(v, d, r), state| {
-            state.def_level_iter_buffer.push_back(d);
-            state.rep_level_iter_buffer.push_back(r);
-            v
-        },
-    );
-
-    let def_level_iter = UnzipIter::new(
-        shared_data.clone(),
-        |state| &mut state.def_level_iter_buffer,
-        |(v, d, r), state| {
-            state.value_iter_buffer.push_back(v);
-            state.rep_level_iter_buffer.push_back(r);
-            d
-        },
-    );
-
-    let rep_level_iter = UnzipIter::new(
-        shared_data,
-        |state| &mut state.rep_level_iter_buffer,
-        |(v, d, r), state| {
-            state.value_iter_buffer.push_back(v);
-            state.def_level_iter_buffer.push_back(d);
-            r
-        },
-    );
-
-    (value_iter, def_level_iter, rep_level_iter)
-}
-
-pub trait ArrayConverter {
-    fn convert_value_bytes(
-        &self,
-        value_decoder: &mut impl ValueDecoder,
-        num_values: usize,
-    ) -> Result<arrow::array::ArrayData>;
-}
-
-pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
-    column_desc: ColumnDescPtr,
-    data_type: ArrowType,
-    def_level_decoder: Box<dyn ValueDecoder + 'a>,
-    rep_level_decoder: Box<dyn ValueDecoder + 'a>,
-    value_decoder: Box<dyn ValueDecoder + 'a>,
-    last_def_levels: Option<Int16Array>,
-    last_rep_levels: Option<Int16Array>,
-    array_converter: C,
-}
-
-pub(crate) struct ColumnChunkContext {
-    dictionary_values: Option<Vec<ByteBufferPtr>>,
-}
-
-impl ColumnChunkContext {
-    fn new() -> Self {
-        Self {
-            dictionary_values: None,
-        }
-    }
-
-    fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
-        self.dictionary_values = Some(dictionary_values);
-    }
-}
-
-type PageDecoderTuple = (
-    Box<dyn ValueDecoder>,
-    Box<dyn ValueDecoder>,
-    Box<dyn ValueDecoder>,
-);
-
-impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
-    pub fn try_new<P: PageIterator + 'a>(
-        column_chunk_iterator: P,
-        column_desc: ColumnDescPtr,
-        array_converter: C,
-        arrow_type: Option<ArrowType>,
-    ) -> Result<Self> {
-        let data_type = match arrow_type {
-            Some(t) => t,
-            None => parquet_to_arrow_field(column_desc.as_ref())?
-                .data_type()
-                .clone(),
-        };
-        type PageIteratorItem = Result<(Page, 
Rc<RefCell<ColumnChunkContext>>)>;
-        let page_iter = column_chunk_iterator
-            // build iterator of pages across column chunks
-            .flat_map(|x| -> Box<dyn Iterator<Item = PageIteratorItem>> {
-                // attach column chunk context
-                let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
-                match x {
-                    Ok(page_reader) => Box::new(
-                        page_reader.map(move |pr| pr.map(|p| (p, 
context.clone()))),
-                    ),
-                    // errors from reading column chunks / row groups are 
propagated to page level
-                    Err(e) => Box::new(std::iter::once(Err(e))),
-                }
-            });
-        // capture a clone of column_desc in closure so that it can outlive 
current function
-        let map_page_fn_factory = |column_desc: ColumnDescPtr| {
-            move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| {
-                x.and_then(|(page, context)| {
-                    Self::map_page(page, context, column_desc.as_ref())
-                })
-            }
-        };
-        let map_page_fn = map_page_fn_factory(column_desc.clone());
-        // map page iterator into tuple of buffer iterators for (values, def 
levels, rep levels)
-        // errors from lower levels are surfaced through the value decoder 
iterator
-        let decoder_iter = page_iter.map(map_page_fn).map(|x| match x {
-            Ok(iter_tuple) => iter_tuple,
-            // errors from reading pages are propagated to decoder iterator 
level
-            Err(e) => Self::map_page_error(e),
-        });
-        // split tuple iterator into separate iterators for (values, def 
levels, rep levels)
-        let (value_iter, def_level_iter, rep_level_iter) = 
unzip_iter(decoder_iter);
-
-        Ok(Self {
-            column_desc,
-            data_type,
-            def_level_decoder: 
Box::new(CompositeValueDecoder::new(def_level_iter)),
-            rep_level_decoder: 
Box::new(CompositeValueDecoder::new(rep_level_iter)),
-            value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
-            last_def_levels: None,
-            last_rep_levels: None,
-            array_converter,
-        })
-    }
-
-    #[inline]
-    fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
-        column_desc.max_def_level() > 0
-    }
-
-    #[inline]
-    fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
-        column_desc.max_rep_level() > 0
-    }
-
-    fn map_page_error(err: ParquetError) -> PageDecoderTuple {
-        (
-            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
-            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
-            Box::new(<dyn ValueDecoder>::once(Err(err))),
-        )
-    }
-
-    // Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, 
Iterator<RepLevels>)>
-    // this method could fail, e.g. if the page encoding is not supported
-    fn map_page(
-        page: Page,
-        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
-        column_desc: &ColumnDescriptor,
-    ) -> Result<PageDecoderTuple> {
-        use crate::encodings::levels::LevelDecoder;
-        match page {
-            Page::DictionaryPage {
-                buf,
-                num_values,
-                encoding,
-                ..
-            } => {
-                let mut column_chunk_context = 
column_chunk_context.borrow_mut();
-                if column_chunk_context.dictionary_values.is_some() {
-                    return Err(general_err!(
-                        "Column chunk cannot have more than one dictionary"
-                    ));
-                }
-                // create plain decoder for dictionary values
-                let mut dict_decoder = Self::get_dictionary_page_decoder(
-                    buf,
-                    num_values as usize,
-                    encoding,
-                    column_desc,
-                )?;
-                // decode and cache dictionary values
-                let dictionary_values = dict_decoder.read_dictionary_values()?;
-                column_chunk_context.set_dictionary(dictionary_values);
-
-                // a dictionary page doesn't return any values
-                Ok((
-                    Box::new(<dyn ValueDecoder>::empty()),
-                    Box::new(<dyn ValueDecoder>::empty()),
-                    Box::new(<dyn ValueDecoder>::empty()),
-                ))
-            }
-            Page::DataPage {
-                buf,
-                num_values,
-                encoding,
-                def_level_encoding,
-                rep_level_encoding,
-                statistics: _,
-            } => {
-                let mut buffer_ptr = buf;
-                // create rep level decoder iterator
-                let rep_level_iter: Box<dyn ValueDecoder> =
-                    if Self::rep_levels_available(column_desc) {
-                        let mut rep_decoder = LevelDecoder::v1(
-                            rep_level_encoding,
-                            column_desc.max_rep_level(),
-                        );
-                        let rep_level_byte_len =
-                            rep_decoder.set_data(num_values as usize, 
buffer_ptr.all());
-                        // advance buffer pointer
-                        buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
-                        Box::new(LevelValueDecoder::new(rep_decoder))
-                    } else {
-                        Box::new(<dyn 
ValueDecoder>::once(Err(ParquetError::General(
-                            "rep levels are not available".to_string(),
-                        ))))
-                    };
-                // create def level decoder iterator
-                let (def_level_iter, value_count): (Box<dyn ValueDecoder>, 
usize) =
-                    if Self::def_levels_available(column_desc) {
-                        // calculate actual value count
-                        let mut def_decoder = LevelDecoder::v1(
-                            def_level_encoding,
-                            column_desc.max_def_level(),
-                        );
-                        def_decoder.set_data(num_values as usize, 
buffer_ptr.all());
-                        let value_count = Self::count_def_level_values(
-                            column_desc,
-                            def_decoder,
-                            num_values as usize,
-                        )?;
-                        // create def level decoder
-                        def_decoder = LevelDecoder::v1(
-                            def_level_encoding,
-                            column_desc.max_def_level(),
-                        );
-                        let def_levels_byte_len =
-                            def_decoder.set_data(num_values as usize, 
buffer_ptr.all());
-                        // advance buffer pointer
-                        buffer_ptr = 
buffer_ptr.start_from(def_levels_byte_len);
-                        (Box::new(LevelValueDecoder::new(def_decoder)), 
value_count)
-                    } else {
-                        (
-                            Box::new(<dyn ValueDecoder>::once(Err(
-                                ParquetError::General(
-                                    "def levels are not available".to_string(),
-                                ),
-                            ))),
-                            num_values as usize,
-                        )
-                    };
-                // create value decoder iterator
-                let value_iter = Self::get_value_decoder(
-                    buffer_ptr,
-                    value_count,
-                    encoding,
-                    column_desc,
-                    column_chunk_context,
-                )?;
-                Ok((value_iter, def_level_iter, rep_level_iter))
-            }
-            Page::DataPageV2 {
-                buf,
-                num_values,
-                encoding,
-                num_nulls: _,
-                num_rows: _,
-                def_levels_byte_len,
-                rep_levels_byte_len,
-                is_compressed: _,
-                statistics: _,
-            } => {
-                let mut offset = 0;
-                // create rep level decoder iterator
-                let rep_level_iter: Box<dyn ValueDecoder> =
-                    if Self::rep_levels_available(column_desc) {
-                        let rep_levels_byte_len = rep_levels_byte_len as usize;
-                        let mut rep_decoder =
-                            LevelDecoder::v2(column_desc.max_rep_level());
-                        rep_decoder.set_data_range(
-                            num_values as usize,
-                            &buf,
-                            offset,
-                            rep_levels_byte_len,
-                        );
-                        offset += rep_levels_byte_len;
-                        Box::new(LevelValueDecoder::new(rep_decoder))
-                    } else {
-                        Box::new(<dyn 
ValueDecoder>::once(Err(ParquetError::General(
-                            "rep levels are not available".to_string(),
-                        ))))
-                    };
-                // create def level decoder iterator
-                let (def_level_iter, value_count): (Box<dyn ValueDecoder>, 
usize) =
-                    if Self::def_levels_available(column_desc) {
-                        let def_levels_byte_len = def_levels_byte_len as usize;
-                        // calculate actual value count
-                        let mut def_decoder =
-                            LevelDecoder::v2(column_desc.max_def_level());
-                        def_decoder.set_data_range(
-                            num_values as usize,
-                            &buf,
-                            offset,
-                            def_levels_byte_len,
-                        );
-                        let value_count = Self::count_def_level_values(
-                            column_desc,
-                            def_decoder,
-                            num_values as usize,
-                        )?;
-                        // create def level decoder
-                        def_decoder = 
LevelDecoder::v2(column_desc.max_def_level());
-                        def_decoder.set_data_range(
-                            num_values as usize,
-                            &buf,
-                            offset,
-                            def_levels_byte_len,
-                        );
-                        offset += def_levels_byte_len;
-                        (Box::new(LevelValueDecoder::new(def_decoder)), 
value_count)
-                    } else {
-                        (
-                            Box::new(<dyn ValueDecoder>::once(Err(
-                                ParquetError::General(
-                                    "def levels are not available".to_string(),
-                                ),
-                            ))),
-                            num_values as usize,
-                        )
-                    };
-
-                // create value decoder iterator
-                let values_buffer = buf.start_from(offset);
-                let value_iter = Self::get_value_decoder(
-                    values_buffer,
-                    value_count,
-                    encoding,
-                    column_desc,
-                    column_chunk_context,
-                )?;
-                Ok((value_iter, def_level_iter, rep_level_iter))
-            }
-        }
-    }
-
-    fn count_def_level_values(
-        column_desc: &ColumnDescriptor,
-        level_decoder: crate::encodings::levels::LevelDecoder,
-        num_values: usize,
-    ) -> Result<usize> {
-        let mut def_level_decoder = LevelValueDecoder::new(level_decoder);
-        let def_level_array =
-            Self::build_level_array(&mut def_level_decoder, num_values)?;
-        let def_level_count = def_level_array.len();
-        // use eq_scalar to efficiently build null bitmap array from def levels
-        let null_bitmap_array =
-            arrow::compute::eq_scalar(&def_level_array, 
column_desc.max_def_level())?;
-        // efficiently calculate values to read
-        Ok(null_bitmap_array
-            .values()
-            .count_set_bits_offset(0, def_level_count))
-    }
-
-    fn get_dictionary_page_decoder(
-        values_buffer: ByteBufferPtr,
-        num_values: usize,
-        mut encoding: Encoding,
-        column_desc: &ColumnDescriptor,
-    ) -> Result<Box<dyn DictionaryValueDecoder>> {
-        if encoding == Encoding::PLAIN || encoding == 
Encoding::PLAIN_DICTIONARY {
-            encoding = Encoding::RLE_DICTIONARY
-        }
-
-        if encoding == Encoding::RLE_DICTIONARY {
-            Ok(
-                Self::get_plain_value_decoder(values_buffer, num_values, 
column_desc)
-                    .into_dictionary_decoder(),
-            )
-        } else {
-            Err(nyi_err!(
-                "Invalid/Unsupported encoding type for dictionary: {}",
-                encoding
-            ))
-        }
-    }
-
-    fn get_value_decoder(
-        values_buffer: ByteBufferPtr,
-        num_values: usize,
-        mut encoding: Encoding,
-        column_desc: &ColumnDescriptor,
-        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
-    ) -> Result<Box<dyn ValueDecoder>> {
-        if encoding == Encoding::PLAIN_DICTIONARY {
-            encoding = Encoding::RLE_DICTIONARY;
-        }
-
-        match encoding {
-            Encoding::PLAIN => {
-                Ok(
-                    Self::get_plain_value_decoder(values_buffer, num_values, 
column_desc)
-                        .into_value_decoder(),
-                )
-            }
-            Encoding::RLE_DICTIONARY => {
-                if column_chunk_context.borrow().dictionary_values.is_some() {
-                    let value_bit_len = 
Self::get_column_physical_bit_len(column_desc);
-                    let dictionary_decoder: Box<dyn ValueDecoder> = if 
value_bit_len == 0
-                    {
-                        Box::new(VariableLenDictionaryDecoder::new(
-                            column_chunk_context,
-                            values_buffer,
-                            num_values,
-                        ))
-                    } else {
-                        Box::new(FixedLenDictionaryDecoder::new(
-                            column_chunk_context,
-                            values_buffer,
-                            num_values,
-                            value_bit_len,
-                        ))
-                    };
-                    Ok(dictionary_decoder)
-                } else {
-                    Err(general_err!("Dictionary values have not been 
initialized."))
-                }
-            }
-            // Encoding::RLE => Box::new(RleValueDecoder::new()),
-            // Encoding::DELTA_BINARY_PACKED => 
Box::new(DeltaBitPackDecoder::new()),
-            // Encoding::DELTA_LENGTH_BYTE_ARRAY => 
Box::new(DeltaLengthByteArrayDecoder::new()),
-            Encoding::DELTA_BYTE_ARRAY => 
Ok(Box::new(DeltaByteArrayValueDecoder::new(
-                values_buffer,
-                num_values,
-            )?)),
-            e => return Err(nyi_err!("Encoding {} is not supported", e)),
-        }
-    }
-
-    fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
-        use crate::basic::Type as PhysicalType;
-        // parquet only supports a limited number of physical types
-        // later converters cast to a more specific arrow / logical type if 
necessary
-        match column_desc.physical_type() {
-            PhysicalType::BOOLEAN => 1,
-            PhysicalType::INT32 | PhysicalType::FLOAT => 32,
-            PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
-            PhysicalType::INT96 => 96,
-            PhysicalType::BYTE_ARRAY => 0,
-            PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as 
usize * 8,
-        }
-    }
-
-    fn get_plain_value_decoder(
-        values_buffer: ByteBufferPtr,
-        num_values: usize,
-        column_desc: &ColumnDescriptor,
-    ) -> Box<dyn PlainValueDecoder> {
-        let value_bit_len = Self::get_column_physical_bit_len(column_desc);
-        if value_bit_len == 0 {
-            Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
-        } else {
-            Box::new(FixedLenPlainDecoder::new(
-                values_buffer,
-                num_values,
-                value_bit_len,
-            ))
-        }
-    }
-
-    fn build_level_array(
-        level_decoder: &mut impl ValueDecoder,
-        batch_size: usize,
-    ) -> Result<Int16Array> {
-        use arrow::datatypes::Int16Type;
-        let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
-        let array_data =
-            level_converter.convert_value_bytes(level_decoder, batch_size)?;
-        Ok(Int16Array::from(array_data))
-    }
-}
-
-impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_data_type(&self) -> &ArrowType {
-        &self.data_type
-    }
-
-    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
-        if Self::rep_levels_available(&self.column_desc) {
-            // read rep levels if available
-            let rep_level_array =
-                Self::build_level_array(&mut self.rep_level_decoder, 
batch_size)?;
-            self.last_rep_levels = Some(rep_level_array);
-        }
-
-        // check if def levels are available
-        let (values_to_read, null_bitmap_array) =
-            if !Self::def_levels_available(&self.column_desc) {
-                // if no def levels - just read (up to) batch_size values
-                (batch_size, None)
-            } else {
-                // if def levels are available - they determine how many 
values will be read
-                // decode def levels, return first error if any
-                let def_level_array =
-                    Self::build_level_array(&mut self.def_level_decoder, 
batch_size)?;
-                let def_level_count = def_level_array.len();
-                // use eq_scalar to efficiently build null bitmap array from 
def levels
-                let null_bitmap_array = arrow::compute::eq_scalar(
-                    &def_level_array,
-                    self.column_desc.max_def_level(),
-                )?;
-                self.last_def_levels = Some(def_level_array);
-                // efficiently calculate values to read
-                let values_to_read = null_bitmap_array
-                    .values()
-                    .count_set_bits_offset(0, def_level_count);
-                let maybe_null_bitmap = if values_to_read != 
null_bitmap_array.len() {
-                    Some(null_bitmap_array)
-                } else {
-                    // shortcut if no NULLs
-                    None
-                };
-                (values_to_read, maybe_null_bitmap)
-            };
-
-        // read a batch of values
-        // converter only creates a no-null / all value array data
-        let mut value_array_data = self
-            .array_converter
-            .convert_value_bytes(&mut self.value_decoder, values_to_read)?;
-
-        if let Some(null_bitmap_array) = null_bitmap_array {
-            // Only if def levels are available - insert null values 
efficiently using MutableArrayData.
-            // This will require value bytes to be copied again, but converter 
requirements are reduced.
-            // With a small number of NULLs, this will only be a few copies of 
large byte sequences.
-            let actual_batch_size = null_bitmap_array.len();
-            // use_nulls is false, because null_bitmap_array is already 
calculated and re-used
-            let mut mutable = arrow::array::MutableArrayData::new(
-                vec![&value_array_data],
-                false,
-                actual_batch_size,
-            );
-            // SlicesIterator slices only the true values, NULLs are inserted 
to fill any gaps
-            arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(
-                |(start, end)| {
-                    // the gap needs to be filled with NULLs
-                    if start > mutable.len() {
-                        let nulls_to_add = start - mutable.len();
-                        mutable.extend_nulls(nulls_to_add);
-                    }
-                    // fill values, adjust start and end with NULL count so far
-                    let nulls_added = mutable.null_count();
-                    mutable.extend(0, start - nulls_added, end - nulls_added);
-                },
-            );
-            // any remaining part is NULLs
-            if mutable.len() < actual_batch_size {
-                let nulls_to_add = actual_batch_size - mutable.len();
-                mutable.extend_nulls(nulls_to_add);
-            }
-
-            value_array_data = unsafe {
-                mutable
-                    .into_builder()
-                    .null_bit_buffer(null_bitmap_array.values().clone())
-                    .build_unchecked()
-            };
-        }
-        let mut array = arrow::array::make_array(value_array_data);
-        if array.data_type() != &self.data_type {
-            // cast array to self.data_type if necessary
-            array = arrow::compute::cast(&array, &self.data_type)?
-        }
-        Ok(array)
-    }
-
-    fn get_def_levels(&self) -> Option<&[i16]> {
-        self.last_def_levels.as_ref().map(|x| x.values())
-    }
-
-    fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.last_rep_levels.as_ref().map(|x| x.values())
-    }
-}
-
-use crate::encodings::rle::RleDecoder;
-
-pub trait ValueDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize>;
-}
-
-trait DictionaryValueDecoder {
-    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
-}
-
-trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
-    fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
-    fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn 
DictionaryValueDecoder>;
-}
-
-impl<T> PlainValueDecoder for T
-where
-    T: ValueDecoder + DictionaryValueDecoder + 'static,
-{
-    fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
-        self
-    }
-
-    fn into_dictionary_decoder(self: Box<T>) -> Box<dyn 
DictionaryValueDecoder> {
-        self
-    }
-}
-
-impl dyn ValueDecoder {
-    fn empty() -> impl ValueDecoder {
-        SingleValueDecoder::new(Ok(0))
-    }
-
-    fn once(value: Result<usize>) -> impl ValueDecoder {
-        SingleValueDecoder::new(value)
-    }
-}
-
-impl ValueDecoder for Box<dyn ValueDecoder> {
-    #[inline]
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        self.as_mut().read_value_bytes(num_values, read_bytes)
-    }
-}
-
-struct SingleValueDecoder {
-    value: Result<usize>,
-}
-
-impl SingleValueDecoder {
-    fn new(value: Result<usize>) -> Self {
-        Self { value }
-    }
-}
-
-impl ValueDecoder for SingleValueDecoder {
-    fn read_value_bytes(
-        &mut self,
-        _num_values: usize,
-        _read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        self.value.clone()
-    }
-}
-
-struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
-    current_decoder: Option<Box<dyn ValueDecoder>>,
-    decoder_iter: I,
-}
-
-impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
-    fn new(mut decoder_iter: I) -> Self {
-        let current_decoder = decoder_iter.next();
-        Self {
-            current_decoder,
-            decoder_iter,
-        }
-    }
-}
-
-impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder
-    for CompositeValueDecoder<I>
-{
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        let mut values_to_read = num_values;
-        while values_to_read > 0 {
-            let value_decoder = match self.current_decoder.as_mut() {
-                Some(d) => d,
-                // no more decoders
-                None => break,
-            };
-            while values_to_read > 0 {
-                let values_read =
-                    value_decoder.read_value_bytes(values_to_read, 
read_bytes)?;
-                if values_read > 0 {
-                    values_to_read -= values_read;
-                } else {
-                    // no more values in current decoder
-                    self.current_decoder = self.decoder_iter.next();
-                    break;
-                }
-            }
-        }
-
-        Ok(num_values - values_to_read)
-    }
-}
-
-struct LevelValueDecoder {
-    level_decoder: crate::encodings::levels::LevelDecoder,
-    level_value_buffer: Vec<i16>,
-}
-
-impl LevelValueDecoder {
-    fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
-        Self {
-            level_decoder,
-            level_value_buffer: vec![0i16; 2048],
-        }
-    }
-}
-
-impl ValueDecoder for LevelValueDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        let value_size = std::mem::size_of::<i16>();
-        let mut total_values_read = 0;
-        while total_values_read < num_values {
-            let values_to_read = std::cmp::min(
-                num_values - total_values_read,
-                self.level_value_buffer.len(),
-            );
-            let values_read = match self
-                .level_decoder
-                .get(&mut self.level_value_buffer[..values_to_read])
-            {
-                Ok(values_read) => values_read,
-                Err(e) => return Err(e),
-            };
-            if values_read > 0 {
-                let level_value_bytes =
-                    &self.level_value_buffer.to_byte_slice()[..values_read * 
value_size];
-                read_bytes(level_value_bytes, values_read);
-                total_values_read += values_read;
-            } else {
-                break;
-            }
-        }
-        Ok(total_values_read)
-    }
-}
-
-pub(crate) struct FixedLenPlainDecoder {
-    data: ByteBufferPtr,
-    num_values: usize,
-    value_bit_len: usize,
-}
-
-impl FixedLenPlainDecoder {
-    pub(crate) fn new(
-        data: ByteBufferPtr,
-        num_values: usize,
-        value_bit_len: usize,
-    ) -> Self {
-        Self {
-            data,
-            num_values,
-            value_bit_len,
-        }
-    }
-}
-
-impl DictionaryValueDecoder for FixedLenPlainDecoder {
-    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
-        let value_byte_len = self.value_bit_len / 8;
-        let available_values = self.data.len() / value_byte_len;
-        let values_to_read = std::cmp::min(available_values, self.num_values);
-        let byte_len = values_to_read * value_byte_len;
-        let values = vec![self.data.range(0, byte_len)];
-        self.num_values = 0;
-        self.data.set_range(self.data.start(), 0);
-        Ok(values)
-    }
-}
-
-impl ValueDecoder for FixedLenPlainDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        let available_values = self.data.len() * 8 / self.value_bit_len;
-        if available_values > 0 {
-            let values_to_read = std::cmp::min(available_values, num_values);
-            let byte_len = values_to_read * self.value_bit_len / 8;
-            read_bytes(&self.data.data()[..byte_len], values_to_read);
-            self.data
-                .set_range(self.data.start() + byte_len, self.data.len() - 
byte_len);
-            Ok(values_to_read)
-        } else {
-            Ok(0)
-        }
-    }
-}
-
-pub(crate) struct VariableLenPlainDecoder {
-    data: ByteBufferPtr,
-    num_values: usize,
-    position: usize,
-}
-
-impl VariableLenPlainDecoder {
-    pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
-        Self {
-            data,
-            num_values,
-            position: 0,
-        }
-    }
-}
-
-impl DictionaryValueDecoder for VariableLenPlainDecoder {
-    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
-        const LEN_SIZE: usize = std::mem::size_of::<u32>();
-        let data = self.data.data();
-        let data_len = data.len();
-        let values_to_read = self.num_values;
-        let mut values = Vec::with_capacity(values_to_read);
-        let mut values_read = 0;
-        while self.position < data_len && values_read < values_to_read {
-            let len: usize =
-                read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
-            self.position += LEN_SIZE;
-            if data_len < self.position + len {
-                return Err(eof_err!("Not enough bytes to decode"));
-            }
-            values.push(self.data.range(self.position, len));
-            self.position += len;
-            values_read += 1;
-        }
-        self.num_values -= values_read;
-        Ok(values)
-    }
-}
-
-impl ValueDecoder for VariableLenPlainDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        const LEN_SIZE: usize = std::mem::size_of::<u32>();
-        let data = self.data.data();
-        let data_len = data.len();
-        let values_to_read = std::cmp::min(self.num_values, num_values);
-        let mut values_read = 0;
-        while self.position < data_len && values_read < values_to_read {
-            let len: usize =
-                read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
-            self.position += LEN_SIZE;
-            if data_len < self.position + len {
-                return Err(eof_err!("Not enough bytes to decode"));
-            }
-            read_bytes(&data[self.position..][..len], 1);
-            self.position += len;
-            values_read += 1;
-        }
-        self.num_values -= values_read;
-        Ok(values_read)
-    }
-}
-
-pub(crate) struct FixedLenDictionaryDecoder {
-    context_ref: Rc<RefCell<ColumnChunkContext>>,
-    key_data_bufer: ByteBufferPtr,
-    num_values: usize,
-    rle_decoder: RleDecoder,
-    value_byte_len: usize,
-    keys_buffer: Vec<i32>,
-}
-
-impl FixedLenDictionaryDecoder {
-    pub(crate) fn new(
-        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
-        key_data_bufer: ByteBufferPtr,
-        num_values: usize,
-        value_bit_len: usize,
-    ) -> Self {
-        assert!(
-            value_bit_len % 8 == 0,
-            "value_bit_size must be a multiple of 8"
-        );
-        // First byte in `data` is bit width
-        let bit_width = key_data_bufer.data()[0];
-        let mut rle_decoder = RleDecoder::new(bit_width);
-        rle_decoder.set_data(key_data_bufer.start_from(1));
-
-        Self {
-            context_ref: column_chunk_context,
-            key_data_bufer,
-            num_values,
-            rle_decoder,
-            value_byte_len: value_bit_len / 8,
-            keys_buffer: vec![0; 2048],
-        }
-    }
-}
-
-impl ValueDecoder for FixedLenDictionaryDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        if self.num_values == 0 {
-            return Ok(0);
-        }
-        let context = self.context_ref.borrow();
-        let values = context.dictionary_values.as_ref().unwrap();
-        let input_value_bytes = values[0].data();
-        // read no more than available values or requested values
-        let values_to_read = std::cmp::min(self.num_values, num_values);
-        let mut values_read = 0;
-        while values_read < values_to_read {
-            // read values in batches of up to self.keys_buffer.len()
-            let keys_to_read =
-                std::cmp::min(values_to_read - values_read, 
self.keys_buffer.len());
-            let keys_read = match self
-                .rle_decoder
-                .get_batch(&mut self.keys_buffer[..keys_to_read])
-            {
-                Ok(keys_read) => keys_read,
-                Err(e) => return Err(e),
-            };
-            if keys_read == 0 {
-                self.num_values = 0;
-                return Ok(values_read);
-            }
-            for i in 0..keys_read {
-                let key = self.keys_buffer[i] as usize;
-                read_bytes(
-                    &input_value_bytes[key * self.value_byte_len..]
-                        [..self.value_byte_len],
-                    1,
-                );
-            }
-            values_read += keys_read;
-        }
-        self.num_values -= values_read;
-        Ok(values_read)
-    }
-}
-
-pub(crate) struct VariableLenDictionaryDecoder {
-    context_ref: Rc<RefCell<ColumnChunkContext>>,
-    key_data_bufer: ByteBufferPtr,
-    num_values: usize,
-    rle_decoder: RleDecoder,
-    keys_buffer: Vec<i32>,
-}
-
-impl VariableLenDictionaryDecoder {
-    pub(crate) fn new(
-        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
-        key_data_bufer: ByteBufferPtr,
-        num_values: usize,
-    ) -> Self {
-        // First byte in `data` is bit width
-        let bit_width = key_data_bufer.data()[0];
-        let mut rle_decoder = RleDecoder::new(bit_width);
-        rle_decoder.set_data(key_data_bufer.start_from(1));
-
-        Self {
-            context_ref: column_chunk_context,
-            key_data_bufer,
-            num_values,
-            rle_decoder,
-            keys_buffer: vec![0; 2048],
-        }
-    }
-}
-
-impl ValueDecoder for VariableLenDictionaryDecoder {
-    fn read_value_bytes(
-        &mut self,
-        num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        if self.num_values == 0 {
-            return Ok(0);
-        }
-        let context = self.context_ref.borrow();
-        let values = context.dictionary_values.as_ref().unwrap();
-        let values_to_read = std::cmp::min(self.num_values, num_values);
-        let mut values_read = 0;
-        while values_read < values_to_read {
-            // read values in batches of up to self.keys_buffer.len()
-            let keys_to_read =
-                std::cmp::min(values_to_read - values_read, 
self.keys_buffer.len());
-            let keys_read = match self
-                .rle_decoder
-                .get_batch(&mut self.keys_buffer[..keys_to_read])
-            {
-                Ok(keys_read) => keys_read,
-                Err(e) => return Err(e),
-            };
-            if keys_read == 0 {
-                self.num_values = 0;
-                return Ok(values_read);
-            }
-            for i in 0..keys_read {
-                let key = self.keys_buffer[i] as usize;
-                read_bytes(values[key].data(), 1);
-            }
-            values_read += keys_read;
-        }
-        self.num_values -= values_read;
-        Ok(values_read)
-    }
-}
-
-pub(crate) struct DeltaByteArrayValueDecoder {
-    decoder: DeltaByteArrayDecoder<ByteArrayType>,
-}
-
-impl DeltaByteArrayValueDecoder {
-    pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
-        let mut decoder = DeltaByteArrayDecoder::new();
-        decoder.set_data(data, num_values)?;
-        Ok(Self { decoder })
-    }
-}
-
-impl ValueDecoder for DeltaByteArrayValueDecoder {
-    fn read_value_bytes(
-        &mut self,
-        mut num_values: usize,
-        read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
-        num_values = std::cmp::min(num_values, self.decoder.values_left());
-        let mut values_read = 0;
-        let mut buf = [ByteArray::new()];
-        while values_read < num_values {
-            let num_read = self.decoder.get(&mut buf)?;
-            debug_assert_eq!(num_read, 1);
-
-            read_bytes(buf[0].data(), 1);
-
-            values_read += 1;
-        }
-        Ok(values_read)
-    }
-}
-
-use arrow::datatypes::ArrowPrimitiveType;
-
-pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
-    _phantom_data: PhantomData<T>,
-}
-
-impl<T: ArrowPrimitiveType> PrimitiveArrayConverter<T> {
-    pub fn new() -> Self {
-        Self {
-            _phantom_data: PhantomData,
-        }
-    }
-}
-
-impl<T: ArrowPrimitiveType> ArrayConverter for PrimitiveArrayConverter<T> {
-    fn convert_value_bytes(
-        &self,
-        value_decoder: &mut impl ValueDecoder,
-        num_values: usize,
-    ) -> Result<arrow::array::ArrayData> {
-        let value_size = T::get_byte_width();
-        let values_byte_capacity = num_values * value_size;
-        let mut values_buffer = MutableBuffer::new(values_byte_capacity);
-
-        value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| {
-            values_buffer.extend_from_slice(value_bytes);
-        })?;
-
-        // calculate actual data_len, which may be different from the 
iterator's upper bound
-        let value_count = values_buffer.len() / value_size;
-        let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE)
-            .len(value_count)
-            .add_buffer(values_buffer.into());
-        let array_data = unsafe { array_data.build_unchecked() };
-        Ok(array_data)
-    }
-}
-
-pub struct StringArrayConverter {}
-
-impl StringArrayConverter {
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl ArrayConverter for StringArrayConverter {
-    fn convert_value_bytes(
-        &self,
-        value_decoder: &mut impl ValueDecoder,
-        num_values: usize,
-    ) -> Result<arrow::array::ArrayData> {
-        use arrow::datatypes::ArrowNativeType;
-        let offset_size = std::mem::size_of::<i32>();
-        let mut offsets_buffer = MutableBuffer::new((num_values + 1) * 
offset_size);
-        // allocate initial capacity of 1 byte for each item
-        let values_byte_capacity = num_values;
-        let mut values_buffer = MutableBuffer::new(values_byte_capacity);
-
-        let mut length_so_far = i32::default();
-        offsets_buffer.push(length_so_far);
-
-        value_decoder.read_value_bytes(num_values, &mut |value_bytes, 
values_read| {
-            debug_assert_eq!(
-                values_read, 1,
-                "offset length value buffers can only contain bytes for a 
single value"
-            );
-            length_so_far +=
-                <i32 as 
ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
-            // this should be safe because a ValueDecoder should not read more 
than num_values
-            unsafe {
-                offsets_buffer.push_unchecked(length_so_far);
-            }
-            values_buffer.extend_from_slice(value_bytes);
-        })?;
-        // calculate actual data_len, which may be different from the 
iterator's upper bound
-        let data_len = (offsets_buffer.len() / offset_size) - 1;
-        let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
-            .len(data_len)
-            .add_buffer(offsets_buffer.into())
-            .add_buffer(values_buffer.into());
-        let array_data = unsafe { array_data.build_unchecked() };
-        Ok(array_data)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
-    use crate::basic::ConvertedType;
-    use crate::column::page::Page;
-    use crate::column::writer::ColumnWriter;
-    use crate::data_type::ByteArray;
-    use crate::data_type::ByteArrayType;
-    use crate::encodings::encoding::{DictEncoder, Encoder};
-    use crate::file::properties::WriterProperties;
-    use crate::file::reader::SerializedFileReader;
-    use crate::file::serialized_reader::SliceableCursor;
-    use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
-    use crate::util::memory::MemTracker;
-    use crate::schema::parser::parse_message_type;
-    use crate::schema::types::SchemaDescriptor;
-    use crate::util::test_common::page_util::{
-        DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
-    };
-    use crate::{
-        basic::Encoding, column::page::PageReader, 
schema::types::SchemaDescPtr,
-    };
-    use arrow::array::{PrimitiveArray, StringArray};
-    use arrow::datatypes::Int32Type as ArrowInt32;
-    use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
-    use std::io::{Cursor, Seek, SeekFrom, Write};
-    use std::sync::{Arc, Mutex};
-
-    /// Iterator for testing reading empty columns
-    struct EmptyPageIterator {
-        schema: SchemaDescPtr,
-    }
-
-    impl EmptyPageIterator {
-        fn new(schema: SchemaDescPtr) -> Self {
-            EmptyPageIterator { schema }
-        }
-    }
-
-    impl Iterator for EmptyPageIterator {
-        type Item = Result<Box<dyn PageReader>>;
-
-        fn next(&mut self) -> Option<Self::Item> {
-            None
-        }
-    }
-
-    impl PageIterator for EmptyPageIterator {
-        fn schema(&mut self) -> Result<SchemaDescPtr> {
-            Ok(self.schema.clone())
-        }
-
-        fn column_schema(&mut self) -> Result<ColumnDescPtr> {
-            Ok(self.schema.column(0))
-        }
-    }
-
-    #[test]
-    fn test_array_reader_empty_pages() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-          REQUIRED INT32 leaf;
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-        let page_iterator = EmptyPageIterator::new(schema);
-
-        let converter = 
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
-        let mut array_reader =
-            ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None)
-                .unwrap();
-
-        // expect no values to be read
-        let array = array_reader.next_batch(50).unwrap();
-        assert!(array.is_empty());
-    }
-
-    fn make_column_chunks<T: crate::data_type::DataType>(
-        column_desc: ColumnDescPtr,
-        encoding: Encoding,
-        num_levels: usize,
-        min_value: T::T,
-        max_value: T::T,
-        def_levels: &mut Vec<i16>,
-        rep_levels: &mut Vec<i16>,
-        values: &mut Vec<T::T>,
-        page_lists: &mut Vec<Vec<Page>>,
-        use_v2: bool,
-        num_chunks: usize,
-    ) where
-        T::T: PartialOrd + SampleUniform + Copy,
-    {
-        for _i in 0..num_chunks {
-            let mut pages = VecDeque::new();
-            let mut data = Vec::new();
-            let mut page_def_levels = Vec::new();
-            let mut page_rep_levels = Vec::new();
-
-            crate::util::test_common::make_pages::<T>(
-                column_desc.clone(),
-                encoding,
-                1,
-                num_levels,
-                min_value,
-                max_value,
-                &mut page_def_levels,
-                &mut page_rep_levels,
-                &mut data,
-                &mut pages,
-                use_v2,
-            );
-
-            def_levels.append(&mut page_def_levels);
-            rep_levels.append(&mut page_rep_levels);
-            values.append(&mut data);
-            page_lists.push(Vec::from(pages));
-        }
-    }
-
-    #[test]
-    fn test_primitive_array_reader_data() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-          REQUIRED INT32 leaf;
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-
-        // Construct page iterator
-        {
-            let mut data = Vec::new();
-            let mut page_lists = Vec::new();
-            make_column_chunks::<crate::data_type::Int32Type>(
-                column_desc.clone(),
-                Encoding::PLAIN,
-                100,
-                1,
-                200,
-                &mut Vec::new(),
-                &mut Vec::new(),
-                &mut data,
-                &mut page_lists,
-                true,
-                2,
-            );
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
-
-            let converter = 
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
-            let mut array_reader =
-                ArrowArrayReader::try_new(page_iterator, column_desc, 
converter, None)
-                    .unwrap();
-
-            // Read first 50 values, which are all from the first column chunk
-            let array = array_reader.next_batch(50).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
-                array
-            );
-
-            // Read next 100 values, the first 50 ones are from the first 
column chunk,
-            // and the last 50 ones are from the second column chunk
-            let array = array_reader.next_batch(100).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
-                array
-            );
-
-            // Try to read 100 values, however there are only 50 values
-            let array = array_reader.next_batch(100).unwrap();
-            let array = array
-                .as_any()
-                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
-                .unwrap();
-
-            assert_eq!(
-                &PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
-                array
-            );
-        }
-    }
-
-    #[test]
-    fn test_primitive_array_reader_def_and_rep_levels() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL INT32 leaf;
-            }
-        }
-        ";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-
-        let column_desc = schema.column(0);
-
-        // Construct page iterator
-        {
-            let mut def_levels = Vec::new();
-            let mut rep_levels = Vec::new();
-            let mut page_lists = Vec::new();
-            make_column_chunks::<crate::data_type::Int32Type>(
-                column_desc.clone(),
-                Encoding::PLAIN,
-                100,
-                1,
-                200,
-                &mut def_levels,
-                &mut rep_levels,
-                &mut Vec::new(),
-                &mut page_lists,
-                true,
-                2,
-            );
-
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
-
-            let converter = 
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
-            let mut array_reader =
-                ArrowArrayReader::try_new(page_iterator, column_desc, 
converter, None)
-                    .unwrap();
-
-            let mut accu_len: usize = 0;
-
-            // Read first 50 values, which are all from the first column chunk
-            let array = array_reader.next_batch(50).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-            accu_len += array.len();
-
-            // Read next 100 values, the first 50 ones are from the first 
column chunk,
-            // and the last 50 ones are from the second column chunk
-            let array = array_reader.next_batch(100).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-            accu_len += array.len();
-
-            // Try to read 100 values, however there are only 50 values
-            let array = array_reader.next_batch(100).unwrap();
-            assert_eq!(
-                Some(&def_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_def_levels()
-            );
-            assert_eq!(
-                Some(&rep_levels[accu_len..(accu_len + array.len())]),
-                array_reader.get_rep_levels()
-            );
-
-            assert_eq!(accu_len + array.len(), 200);
-        }
-    }
-
-    #[test]
-    fn test_arrow_array_reader_string() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let max_def_level = column_desc.max_def_level();
-        let max_rep_level = column_desc.max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
-
-            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
-
-            let data_page = pb.consume();
-            pages.push(vec![data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
-        let converter = StringArrayConverter::new();
-        let mut array_reader =
-            ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None)
-                .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        let array = array_reader.next_batch(values_per_page / 2).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += array.len();
-
-        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += array.len();
-
-        // Try to read values_per_page values, however there are only 
values_per_page/2 values
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-    }
-
-    #[test]
-    fn test_arrow_array_reader_dict_enc_string() {
-        // Construct column schema
-        let message_type = "
-        message test_schema {
-            REPEATED Group test_mid {
-                OPTIONAL BYTE_ARRAY leaf (UTF8);
-            }
-        }
-        ";
-        let num_pages = 2;
-        let values_per_page = 100;
-        let str_base = "Hello World";
-
-        let schema = parse_message_type(message_type)
-            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
-            .unwrap();
-        let column_desc = schema.column(0);
-        let max_def_level = column_desc.max_def_level();
-        let max_rep_level = column_desc.max_rep_level();
-
-        assert_eq!(max_def_level, 2);
-        assert_eq!(max_rep_level, 1);
-
-        let mut rng = thread_rng();
-        let mut pages: Vec<Vec<Page>> = Vec::new();
-
-        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
-        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
-        for i in 0..num_pages {
-            let mem_tracker = Arc::new(MemTracker::new());
-            let mut dict_encoder =
-                DictEncoder::<ByteArrayType>::new(column_desc.clone(), 
mem_tracker);
-            // add data page
-            let mut values = Vec::with_capacity(values_per_page);
-
-            for _ in 0..values_per_page {
-                let def_level = rng.gen_range(0..max_def_level + 1);
-                let rep_level = rng.gen_range(0..max_rep_level + 1);
-                if def_level == max_def_level {
-                    let len = rng.gen_range(1..str_base.len());
-                    let slice = &str_base[..len];
-                    values.push(ByteArray::from(slice));
-                    all_values.push(Some(slice.to_string()));
-                } else {
-                    all_values.push(None)
-                }
-                rep_levels.push(rep_level);
-                def_levels.push(def_level)
-            }
-
-            let range = i * values_per_page..(i + 1) * values_per_page;
-            let mut pb =
-                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
-            pb.add_rep_levels(max_rep_level, 
&rep_levels.as_slice()[range.clone()]);
-            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
-            let _ = dict_encoder.put(&values);
-            let indices = dict_encoder
-                .write_indices()
-                .expect("write_indices() should be OK");
-            pb.add_indices(indices);
-            let data_page = pb.consume();
-            // for each page log num_values vs actual values in page
-            // println!("page num_values: {}, values.len(): {}", 
data_page.num_values(), values.len());
-            // add dictionary page
-            let dict = dict_encoder
-                .write_dict()
-                .expect("write_dict() should be OK");
-            let dict_page = Page::DictionaryPage {
-                buf: dict,
-                num_values: dict_encoder.num_entries() as u32,
-                encoding: Encoding::RLE_DICTIONARY,
-                is_sorted: false,
-            };
-            pages.push(vec![dict_page, data_page]);
-        }
-
-        let page_iterator = InMemoryPageIterator::new(schema, 
column_desc.clone(), pages);
-        let converter = StringArrayConverter::new();
-        let mut array_reader =
-            ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None)
-                .unwrap();
-
-        let mut accu_len: usize = 0;
-
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page / 2);
-        let array = array_reader.next_batch(values_per_page / 2).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        accu_len += array.len();
-
-        // Read next values_per_page values, the first values_per_page/2 ones 
are from the first column chunk,
-        // and the last values_per_page/2 ones are from the second column chunk
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
-        for i in 0..array.len() {
-            if array.is_valid(i) {
-                assert_eq!(
-                    all_values[i + accu_len].as_ref().unwrap().as_str(),
-                    strings.value(i)
-                )
-            } else {
-                assert_eq!(all_values[i + accu_len], None)
-            }
-        }
-        accu_len += array.len();
-
-        // Try to read values_per_page values, however there are only 
values_per_page/2 values
-        // println!("---------- reading a batch of {} values ----------", 
values_per_page);
-        let array = array_reader.next_batch(values_per_page).unwrap();
-        assert_eq!(array.len(), values_per_page / 2);
-        assert_eq!(
-            Some(&def_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_def_levels()
-        );
-        assert_eq!(
-            Some(&rep_levels[accu_len..(accu_len + array.len())]),
-            array_reader.get_rep_levels()
-        );
-    }
-
-    /// Allows to write parquet into memory. Intended only for use in tests.
-    #[derive(Clone)]
-    struct VecWriter {
-        data: Arc<Mutex<Cursor<Vec<u8>>>>,
-    }
-
-    impl VecWriter {
-        pub fn new() -> VecWriter {
-            VecWriter {
-                data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
-            }
-        }
-
-        pub fn consume(self) -> Vec<u8> {
-            Arc::try_unwrap(self.data)
-                .unwrap()
-                .into_inner()
-                .unwrap()
-                .into_inner()
-        }
-    }
-
-    impl TryClone for VecWriter {
-        fn try_clone(&self) -> std::io::Result<Self> {
-            Ok(self.clone())
-        }
-    }
-
-    impl Seek for VecWriter {
-        fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
-            self.data.lock().unwrap().seek(pos)
-        }
-
-        fn stream_position(&mut self) -> std::io::Result<u64> {
-            self.data.lock().unwrap().stream_position()
-        }
-    }
-
-    impl Write for VecWriter {
-        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-            self.data.lock().unwrap().write(buf)
-        }
-
-        fn flush(&mut self) -> std::io::Result<()> {
-            self.data.lock().unwrap().flush()
-        }
-    }
-
-    #[test]
-    fn test_string_delta_byte_array() {
-        use crate::basic;
-        use crate::schema::types::Type;
-
-        let data = VecWriter::new();
-        let schema = Arc::new(
-            Type::group_type_builder("string_test")
-                .with_fields(&mut vec![Arc::new(
-                    Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
-                        .with_converted_type(ConvertedType::UTF8)
-                        .build()
-                        .unwrap(),
-                )])
-                .build()
-                .unwrap(),
-        );
-        // Disable dictionary and use the fallback encoding.
-        let p = Arc::new(
-            WriterProperties::builder()
-                .set_dictionary_enabled(false)
-                .set_encoding(Encoding::DELTA_BYTE_ARRAY)
-                .build(),
-        );
-        // Write a few strings.
-        let mut w = SerializedFileWriter::new(data.clone(), schema, 
p).unwrap();
-        let mut rg = w.next_row_group().unwrap();
-        let mut c = rg.next_column().unwrap().unwrap();
-        match &mut c {
-            ColumnWriter::ByteArrayColumnWriter(c) => {
-                c.write_batch(
-                    &[ByteArray::from("foo"), ByteArray::from("bar")],
-                    Some(&[0, 1, 0, 0, 1, 0]),
-                    Some(&[0, 0, 0, 0, 0, 0]),
-                )
-                .unwrap();
-            }
-            _ => panic!("unexpected column"),
-        };
-        rg.close_column(c).unwrap();
-        w.close_row_group(rg).unwrap();
-        w.close().unwrap();
-        std::mem::drop(w);
-
-        // Check we can read them back.
-        let r = 
SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
-            .unwrap();
-        let mut r = ParquetFileArrowReader::new(Arc::new(r));
-
-        let batch = r
-            .get_record_reader_by_columns([0], 1024)
-            .unwrap()
-            .next()
-            .unwrap()
-            .unwrap();
-        assert_eq!(batch.columns().len(), 1);
-
-        let strings = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!(
-            strings.into_iter().collect::<Vec<_>>(),
-            vec![None, Some("foo"), None, None, Some("bar"), None]
-        );
-    }
-}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index fbbf655..a2f885a 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -119,7 +119,6 @@
 //! ```
 
 experimental_mod!(array_reader);
-experimental_mod!(arrow_array_reader);
 pub mod arrow_reader;
 pub mod arrow_writer;
 mod bit_util;

Reply via email to