This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d801ac2  Preserve dictionary encoding when decoding parquet into Arrow 
arrays, 60x perf improvement (#171) (#1180)
d801ac2 is described below

commit d801ac20311f3dfa905702aee752c8ca91be5297
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jan 24 12:00:44 2022 +0000

    Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x 
perf improvement (#171) (#1180)
    
    * Preserve dictionary encoding from parquet (#171)
    
    * Use OffsetBuffer::into_array for dictionary
    
    * Fix and test handling of empty dictionaries
    
    Don't panic if missing dictionary page
    
    * Use ArrayRef instead of Arc<ArrayData>
    
    * Update doc comments
    
    * Add integration test
    
    Tweak RecordReader buffering logic
    
    * Add benchmark
    
    * Set write batch size in parquet fuzz tests
    
    Fix bug in column writer with small page sizes
    
    * Fix test_dictionary_preservation
    
    * Add batch_size comment
---
 parquet/benches/arrow_array_reader.rs              | 135 ++++-
 parquet/src/arrow/array_reader.rs                  |  68 +--
 parquet/src/arrow/array_reader/byte_array.rs       | 106 ++--
 .../arrow/array_reader/byte_array_dictionary.rs    | 552 +++++++++++++++++++++
 .../src/arrow/array_reader/dictionary_buffer.rs    | 383 ++++++++++++++
 parquet/src/arrow/array_reader/offset_buffer.rs    |  63 ++-
 parquet/src/arrow/array_reader/test_util.rs        |  89 ++++
 parquet/src/arrow/arrow_reader.rs                  | 243 +++++++--
 parquet/src/arrow/bit_util.rs                      |  95 ++++
 parquet/src/arrow/mod.rs                           |   1 +
 parquet/src/arrow/record_reader.rs                 |  27 +-
 parquet/src/arrow/record_reader/buffer.rs          |  24 +-
 .../src/arrow/record_reader/definition_levels.rs   |  79 +--
 parquet/src/column/writer.rs                       |   8 +
 14 files changed, 1615 insertions(+), 258 deletions(-)

diff --git a/parquet/benches/arrow_array_reader.rs 
b/parquet/benches/arrow_array_reader.rs
index 54f99c4..9db5b4c 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::Array;
+use arrow::datatypes::DataType;
 use criterion::{criterion_group, criterion_main, Criterion};
 use parquet::util::{DataPageBuilder, DataPageBuilderImpl, 
InMemoryPageIterator};
 use parquet::{
@@ -24,6 +26,7 @@ use parquet::{
     data_type::{ByteArrayType, Int32Type},
     schema::types::{ColumnDescPtr, SchemaDescPtr},
 };
+use rand::{rngs::StdRng, Rng, SeedableRng};
 use std::{collections::VecDeque, sync::Arc};
 
 fn build_test_schema() -> SchemaDescPtr {
@@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
 const VALUES_PER_PAGE: usize = 10_000;
 const BATCH_SIZE: usize = 8192;
 
-use arrow::array::Array;
-use rand::{rngs::StdRng, Rng, SeedableRng};
-
 pub fn seedable_rng() -> StdRng {
     StdRng::seed_from_u64(42)
 }
@@ -333,6 +333,46 @@ fn create_string_byte_array_reader(
     make_byte_array_reader(Box::new(page_iterator), column_desc, None, 
true).unwrap()
 }
 
+fn create_string_byte_array_dictionary_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+    use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
+    let arrow_type =
+        DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+    make_byte_array_dictionary_reader(
+        Box::new(page_iterator),
+        column_desc,
+        Some(arrow_type),
+        true,
+    )
+    .unwrap()
+}
+
+fn create_complex_object_byte_array_dictionary_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+    use parquet::arrow::array_reader::{
+        make_byte_array_dictionary_reader, ComplexObjectArrayReader,
+    };
+    use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+    let arrow_type =
+        DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    Box::new(
+        ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
+            Box::new(page_iterator),
+            column_desc,
+            converter,
+            Some(arrow_type),
+        )
+        .unwrap(),
+    )
+}
+
 fn add_benches(c: &mut Criterion) {
     const EXPECTED_VALUE_COUNT: usize =
         NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
@@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) {
     let mandatory_int32_column_desc = schema.column(0);
     let optional_int32_column_desc = schema.column(1);
     let mandatory_string_column_desc = schema.column(2);
-    // println!("mandatory_string_column_desc: {:?}", 
mandatory_string_column_desc);
     let optional_string_column_desc = schema.column(3);
-    // println!("optional_string_column_desc: {:?}", 
optional_string_column_desc);
-
     // primitive / int32 benchmarks
     // =============================
 
@@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) {
 
     // string, dictionary encoded, half NULLs
     let dictionary_string_half_null_data = 
build_dictionary_encoded_string_page_iterator(
-        schema,
+        schema.clone(),
         optional_string_column_desc.clone(),
         0.5,
     );
@@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) {
         },
     );
 
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = 
create_complex_object_byte_array_dictionary_reader(
+                    dictionary_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_byte_array_dictionary_reader(
+                    dictionary_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, optional, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = 
create_complex_object_byte_array_dictionary_reader(
+                    dictionary_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, optional, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_byte_array_dictionary_reader(
+                    dictionary_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, optional, half NULLs - 
old",
+        |b| {
+            b.iter(|| {
+                let array_reader = 
create_complex_object_byte_array_dictionary_reader(
+                    dictionary_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
+    group.bench_function(
+        "read StringDictionary, dictionary encoded, optional, half NULLs - 
new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_byte_array_dictionary_reader(
+                    dictionary_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+
     group.finish();
 }
 
diff --git a/parquet/src/arrow/array_reader.rs 
b/parquet/src/arrow/array_reader.rs
index c11bfc2..01e54f6 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -56,11 +56,10 @@ use arrow::datatypes::{
 use arrow::util::bit_util;
 
 use crate::arrow::converter::{
-    BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
-    DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
-    Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
-    IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
-    IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
+    Converter, DecimalArrayConverter, DecimalConverter, 
FixedLenBinaryConverter,
+    FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
+    IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
+    IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
 };
 use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
 use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::column::reader::ColumnReaderImpl;
 use crate::data_type::{
-    BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, 
FloatType,
-    Int32Type, Int64Type, Int96Type,
+    BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, 
Int32Type,
+    Int64Type, Int96Type,
 };
 use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
 use crate::file::reader::{FilePageIterator, FileReader};
@@ -81,9 +80,15 @@ use crate::schema::types::{
 use crate::schema::visitor::TypeVisitor;
 
 mod byte_array;
+mod byte_array_dictionary;
+mod dictionary_buffer;
 mod offset_buffer;
 
+#[cfg(test)]
+mod test_util;
+
 pub use byte_array::make_byte_array_reader;
+pub use byte_array_dictionary::make_byte_array_dictionary_reader;
 
 /// Array reader reads parquet data into arrow array.
 pub trait ArrayReader {
@@ -271,7 +276,8 @@ where
                 .clone(),
         };
 
-        let record_reader = 
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
+        let record_reader =
+            RecordReader::<T>::new_with_options(column_desc.clone(), 
null_mask_only);
 
         Ok(Self {
             data_type,
@@ -829,17 +835,18 @@ fn remove_indices(
             size
         ),
         ArrowType::Struct(fields) => {
-            let struct_array = arr.as_any()
+            let struct_array = arr
+                .as_any()
                 .downcast_ref::<StructArray>()
                 .expect("Array should be a struct");
 
             // Recursively call remove indices on each of the structs fields
-            let new_columns = fields.into_iter()
+            let new_columns = fields
+                .into_iter()
                 .zip(struct_array.columns())
                 .map(|(field, column)| {
                     let dt = field.data_type().clone();
-                     Ok((field,
-                         remove_indices(column.clone(), dt, indices.clone())?))
+                    Ok((field, remove_indices(column.clone(), dt, 
indices.clone())?))
                 })
                 .collect::<Result<Vec<_>>>()?;
 
@@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
                 )?,
             )),
             PhysicalType::BYTE_ARRAY => match arrow_type {
-                // TODO: Replace with optimised dictionary reader (#171)
-                Some(ArrowType::Dictionary(_, _)) => {
-                    match cur_type.get_basic_info().converted_type() {
-                        ConvertedType::UTF8 => {
-                            let converter = 
Utf8Converter::new(Utf8ArrayConverter {});
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                ByteArrayType,
-                                Utf8Converter,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                        _ => {
-                            let converter = 
BinaryConverter::new(BinaryArrayConverter {});
-                            Ok(Box::new(ComplexObjectArrayReader::<
-                                ByteArrayType,
-                                BinaryConverter,
-                            >::new(
-                                page_iterator,
-                                column_desc,
-                                converter,
-                                arrow_type,
-                            )?))
-                        }
-                    }
-                }
+                Some(ArrowType::Dictionary(_, _)) => 
make_byte_array_dictionary_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    null_mask_only,
+                ),
                 _ => make_byte_array_reader(
                     page_iterator,
                     column_desc,
@@ -2025,7 +2009,7 @@ mod tests {
     use crate::arrow::schema::parquet_to_arrow_schema;
     use crate::basic::{Encoding, Type as PhysicalType};
     use crate::column::page::{Page, PageReader};
-    use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
+    use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, 
Int64Type};
     use crate::errors::Result;
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::schema::parser::parse_message_type;
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index fc214dd..b3606a7 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -203,11 +203,12 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
     }
 
     fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
-        self.decoder.as_mut().expect("decoder set").read(
-            out,
-            range.end - range.start,
-            self.dict.as_ref(),
-        )
+        let decoder = self
+            .decoder
+            .as_mut()
+            .ok_or_else(|| general_err!("no decoder set"))?;
+
+        decoder.read(out, range.end - range.start, self.dict.as_ref())
     }
 }
 
@@ -266,7 +267,9 @@ impl ByteArrayDecoder {
         match self {
             ByteArrayDecoder::Plain(d) => d.read(out, len),
             ByteArrayDecoder::Dictionary(d) => {
-                let dict = dict.expect("dictionary set");
+                let dict = dict
+                    .ok_or_else(|| general_err!("missing dictionary page for 
column"))?;
+
                 d.read(out, dict, len)
             }
             ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
@@ -546,6 +549,10 @@ impl ByteArrayDecoderDictionary {
         dict: &OffsetBuffer<I>,
         len: usize,
     ) -> Result<usize> {
+        if dict.is_empty() {
+            return Ok(0); // All data must be NULL
+        }
+
         let mut values_read = 0;
 
         while values_read != len && self.max_remaining_values != 0 {
@@ -579,69 +586,16 @@ impl ByteArrayDecoderDictionary {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::arrow::array_reader::test_util::{byte_array_all_encodings, 
utf8_column};
     use crate::arrow::record_reader::buffer::ValuesBuffer;
-    use crate::basic::Type as PhysicalType;
-    use crate::data_type::{ByteArray, ByteArrayType};
-    use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
-    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
-    use crate::util::memory::MemTracker;
     use arrow::array::{Array, StringArray};
-    use std::sync::Arc;
-
-    fn column() -> ColumnDescPtr {
-        let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
-            .with_converted_type(ConvertedType::UTF8)
-            .build()
-            .unwrap();
-
-        Arc::new(ColumnDescriptor::new(
-            Arc::new(t),
-            1,
-            0,
-            ColumnPath::new(vec![]),
-        ))
-    }
-
-    fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
-        let descriptor = column();
-        let mem_tracker = Arc::new(MemTracker::new());
-        let mut encoder =
-            get_encoder::<ByteArrayType>(descriptor, encoding, 
mem_tracker).unwrap();
-
-        encoder.put(data).unwrap();
-        encoder.flush_buffer().unwrap()
-    }
 
     #[test]
     fn test_byte_array_decoder() {
-        let data: Vec<_> = vec!["hello", "world", "a", "b"]
-            .into_iter()
-            .map(ByteArray::from)
-            .collect();
-
-        let mut dict_encoder =
-            DictEncoder::<ByteArrayType>::new(column(), 
Arc::new(MemTracker::new()));
-
-        dict_encoder.put(&data).unwrap();
-        let encoded_rle = dict_encoder.flush_buffer().unwrap();
-        let encoded_dictionary = dict_encoder.write_dict().unwrap();
-
-        // A column chunk with all the encodings!
-        let pages = vec![
-            (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
-            (
-                Encoding::DELTA_BYTE_ARRAY,
-                get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
-            ),
-            (
-                Encoding::DELTA_LENGTH_BYTE_ARRAY,
-                get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
-            ),
-            (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
-            (Encoding::RLE_DICTIONARY, encoded_rle),
-        ];
+        let (pages, encoded_dictionary) =
+            byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
 
-        let column_desc = column();
+        let column_desc = utf8_column();
         let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
 
         decoder
@@ -668,15 +622,9 @@ mod tests {
             assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
 
             let valid = vec![false, false, true, true, false, true, true, 
false, false];
-            let rev_position_iter = valid
-                .iter()
-                .enumerate()
-                .rev()
-                .filter_map(|(i, valid)| valid.then(|| i));
-
             let valid_buffer = Buffer::from_iter(valid.iter().cloned());
 
-            output.pad_nulls(0, 4, valid.len(), rev_position_iter);
+            output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
             let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
             let strings = 
array.as_any().downcast_ref::<StringArray>().unwrap();
 
@@ -696,4 +644,22 @@ mod tests {
             );
         }
     }
+
+    #[test]
+    fn test_byte_array_decoder_nulls() {
+        let (pages, encoded_dictionary) = 
byte_array_all_encodings(Vec::<&str>::new());
+
+        let column_desc = utf8_column();
+        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+        decoder
+            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+
+        for (encoding, page) in pages {
+            let mut output = OffsetBuffer::<i32>::default();
+            decoder.set_data(encoding, page, 4, None).unwrap();
+            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+        }
+    }
 }
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
new file mode 100644
index 0000000..eddb3a6
--- /dev/null
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -0,0 +1,552 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::marker::PhantomData;
+use std::ops::Range;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
+use crate::arrow::array_reader::{
+    byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
+    offset_buffer::OffsetBuffer,
+};
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
+use crate::util::memory::ByteBufferPtr;
+
+/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
+macro_rules! make_reader {
+    (
+        ($pages:expr, $column_desc:expr, $data_type:expr, 
$null_mask_only:expr) => match ($k:expr, $v:expr) {
+            $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, 
$value_type:ty),)+
+        }
+    ) => {
+        match (($k, $v)) {
+            $(
+                ($key_arrow, $value_arrow) => {
+                    let reader = GenericRecordReader::new_with_options(
+                        $column_desc,
+                        $null_mask_only,
+                    );
+                    Ok(Box::new(ByteArrayDictionaryReader::<$key_type, 
$value_type>::new(
+                        $pages, $data_type, reader,
+                    )))
+                }
+            )+
+            _ => Err(general_err!(
+                "unsupported data type for byte array dictionary reader - {}",
+                $data_type
+            )),
+        }
+    }
+}
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// This will attempt to preserve any dictionary encoding present in the 
parquet data
+///
+/// It will be unable to preserve the dictionary encoding if:
+///
+/// * A single read spans across multiple column chunks
+/// * A column chunk contains non-dictionary encoded pages
+///
+/// It is therefore recommended that if `pages` contains data from multiple 
column chunks,
+/// that the read batch size used is a divisor of the row group size
+///
+pub fn make_byte_array_dictionary_reader(
+    pages: Box<dyn PageIterator>,
+    column_desc: ColumnDescPtr,
+    arrow_type: Option<ArrowType>,
+    null_mask_only: bool,
+) -> Result<Box<dyn ArrayReader>> {
+    // Check if Arrow type is specified, else create it from Parquet type
+    let data_type = match arrow_type {
+        Some(t) => t,
+        None => parquet_to_arrow_field(column_desc.as_ref())?
+            .data_type()
+            .clone(),
+    };
+
+    match &data_type {
+        ArrowType::Dictionary(key_type, value_type) => {
+            make_reader! {
+                (pages, column_desc, data_type, null_mask_only) => match 
(key_type.as_ref(), value_type.as_ref()) {
+                    (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => 
(u8, i32),
+                    (ArrowType::UInt8, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u8, i64),
+                    (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => 
(i8, i32),
+                    (ArrowType::Int8, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (i8, i64),
+                    (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8) 
=> (u16, i32),
+                    (ArrowType::UInt16, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u16, i64),
+                    (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) => 
(i16, i32),
+                    (ArrowType::Int16, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (i16, i64),
+                    (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8) 
=> (u32, i32),
+                    (ArrowType::UInt32, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u32, i64),
+                    (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) => 
(i32, i32),
+                    (ArrowType::Int32, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (i32, i64),
+                    (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8) 
=> (u64, i32),
+                    (ArrowType::UInt64, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u64, i64),
+                    (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) => 
(i64, i32),
+                    (ArrowType::Int64, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (i64, i64),
+                }
+            }
+        }
+        _ => Err(general_err!(
+            "invalid non-dictionary data type for byte array dictionary reader 
- {}",
+            data_type
+        )),
+    }
+}
+
+/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
+///
+/// Will attempt to preserve any dictionary encoding present in the parquet 
data
+struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    record_reader: GenericRecordReader<DictionaryBuffer<K, V>, 
DictionaryDecoder<K, V>>,
+}
+
+impl<K, V> ByteArrayDictionaryReader<K, V>
+where
+    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+    V: ScalarValue + OffsetSizeTrait,
+{
+    fn new(
+        pages: Box<dyn PageIterator>,
+        data_type: ArrowType,
+        record_reader: GenericRecordReader<
+            DictionaryBuffer<K, V>,
+            DictionaryDecoder<K, V>,
+        >,
+    ) -> Self {
+        Self {
+            data_type,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            record_reader,
+        }
+    }
+}
+
+impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
+where
+    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+    V: ScalarValue + OffsetSizeTrait,
+{
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
+        let buffer = self.record_reader.consume_record_data()?;
+        let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+        let array = buffer.into_array(null_buffer, &self.data_type)?;
+
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.record_reader.reset();
+
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+}
+
+/// If the data is dictionary encoded decode the key data directly, so that 
the dictionary
+/// encoding can be preserved. Otherwise fallback to decoding using 
[`ByteArrayDecoder`]
+/// and compute a fresh dictionary in [`ByteArrayDictionaryReader::next_batch`]
+enum MaybeDictionaryDecoder {
+    Dict {
+        decoder: RleDecoder,
+        /// This is a maximum as the null count is not always known, e.g. 
value data from
+        /// a v1 data page
+        max_remaining_values: usize,
+    },
+    Fallback(ByteArrayDecoder),
+}
+
+/// A [`ColumnValueDecoder`] for dictionary encoded variable length byte arrays
+struct DictionaryDecoder<K, V> {
+    /// The current dictionary
+    dict: Option<ArrayRef>,
+
+    /// Dictionary decoder
+    decoder: Option<MaybeDictionaryDecoder>,
+
+    validate_utf8: bool,
+
+    value_type: ArrowType,
+
+    phantom: PhantomData<(K, V)>,
+}
+
+impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
+where
+    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+    V: ScalarValue + OffsetSizeTrait,
+{
+    type Slice = DictionaryBuffer<K, V>;
+
+    fn new(col: &ColumnDescPtr) -> Self {
+        let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
+
+        let value_type =
+            match (V::is_large(), col.converted_type() == ConvertedType::UTF8) 
{
+                (true, true) => ArrowType::LargeUtf8,
+                (true, false) => ArrowType::LargeBinary,
+                (false, true) => ArrowType::Utf8,
+                (false, false) => ArrowType::Binary,
+            };
+
+        Self {
+            dict: None,
+            decoder: None,
+            validate_utf8,
+            value_type,
+            phantom: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if !matches!(
+            encoding,
+            Encoding::PLAIN | Encoding::RLE_DICTIONARY | 
Encoding::PLAIN_DICTIONARY
+        ) {
+            return Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ));
+        }
+
+        if K::from_usize(num_values as usize).is_none() {
+            return Err(general_err!("dictionary too large for index type"));
+        }
+
+        let len = num_values as usize;
+        let mut buffer = OffsetBuffer::<V>::default();
+        let mut decoder =
+            ByteArrayDecoderPlain::new(buf, len, Some(len), 
self.validate_utf8);
+        decoder.read(&mut buffer, usize::MAX)?;
+
+        let array = buffer.into_array(None, self.value_type.clone());
+        self.dict = Some(Arc::new(array));
+        Ok(())
+    }
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+    ) -> Result<()> {
+        let decoder = match encoding {
+            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
+                let bit_width = data[0];
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data.start_from(1));
+                MaybeDictionaryDecoder::Dict {
+                    decoder,
+                    max_remaining_values: num_values.unwrap_or(num_levels),
+                }
+            }
+            _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new(
+                encoding,
+                data,
+                num_levels,
+                num_values,
+                self.validate_utf8,
+            )?),
+        };
+
+        self.decoder = Some(decoder);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+        match self.decoder.as_mut().expect("decoder set") {
+            MaybeDictionaryDecoder::Fallback(decoder) => {
+                decoder.read(out.spill_values()?, range.end - range.start, 
None)
+            }
+            MaybeDictionaryDecoder::Dict {
+                decoder,
+                max_remaining_values,
+            } => {
+                let len = (range.end - range.start).min(*max_remaining_values);
+
+                let dict = self
+                    .dict
+                    .as_ref()
+                    .ok_or_else(|| general_err!("missing dictionary page for 
column"))?;
+
+                assert_eq!(dict.data_type(), &self.value_type);
+
+                if dict.is_empty() {
+                    return Ok(0); // All data must be NULL
+                }
+
+                match out.as_keys(dict) {
+                    Some(keys) => {
+                        // Happy path - can just copy keys
+                        // Keys will be validated on conversion to arrow
+                        let keys_slice = keys.spare_capacity_mut(range.start + 
len);
+                        let len = decoder.get_batch(&mut 
keys_slice[range.start..])?;
+                        Ok(len)
+                    }
+                    None => {
+                        // Sad path - need to recompute dictionary
+                        //
+                        // This either means we crossed into a new column 
chunk whilst
+                        // reading this batch, or encountered non-dictionary 
encoded data
+                        let values = out.spill_values()?;
+                        let mut keys = vec![K::default(); len];
+                        let len = decoder.get_batch(&mut keys)?;
+
+                        assert_eq!(dict.data_type(), &self.value_type);
+
+                        let dict_buffers = dict.data().buffers();
+                        let dict_offsets = unsafe { 
dict_buffers[0].typed_data::<V>() };
+                        let dict_values = dict_buffers[1].as_slice();
+
+                        values.extend_from_dictionary(
+                            &keys[..len],
+                            dict_offsets,
+                            dict_values,
+                        )?;
+
+                        Ok(len)
+                    }
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    use crate::arrow::array_reader::test_util::{
+        byte_array_all_encodings, encode_dictionary, utf8_column,
+    };
+    use crate::arrow::record_reader::buffer::ValuesBuffer;
+    use crate::data_type::ByteArray;
+
+    use super::*;
+
+    fn utf8_dictionary() -> ArrowType {
+        ArrowType::Dictionary(Box::new(ArrowType::Int32), 
Box::new(ArrowType::Utf8))
+    }
+
+    #[test]
+    fn test_dictionary_preservation() {
+        let data_type = utf8_dictionary();
+
+        let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
+            .into_iter()
+            .map(ByteArray::from)
+            .collect();
+        let (dict, encoded) = encode_dictionary(&data);
+
+        let column_desc = utf8_column();
+        let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
+
+        decoder
+            .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+
+        decoder
+            .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
+            .unwrap();
+
+        let mut output = DictionaryBuffer::<i32, i32>::default();
+        assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
+
+        let mut valid = vec![false, false, true, true, false, true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
+
+        assert!(matches!(output, DictionaryBuffer::Dict { .. }));
+
+        assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
+
+        valid.extend_from_slice(&[false, false, true, true, false, true, true, 
false]);
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
+
+        assert!(matches!(output, DictionaryBuffer::Dict { .. }));
+
+        let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
+        assert_eq!(array.data_type(), &data_type);
+
+        let array = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(strings.len(), 14);
+
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![
+                None,
+                None,
+                Some("0"),
+                Some("1"),
+                None,
+                Some("0"),
+                None,
+                None,
+                Some("1"),
+                Some("2"),
+                None,
+                Some("1"),
+                Some("2"),
+                None
+            ]
+        )
+    }
+
+    #[test]
+    fn test_dictionary_fallback() {
+        let data_type = utf8_dictionary();
+        let data = vec!["hello", "world", "a", "b"];
+
+        let (pages, encoded_dictionary) = 
byte_array_all_encodings(data.clone());
+        let num_encodings = pages.len();
+
+        let column_desc = utf8_column();
+        let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
+
+        decoder
+            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+
+        // Read all pages into single buffer
+        let mut output = DictionaryBuffer::<i32, i32>::default();
+
+        for (encoding, page) in pages {
+            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
+        }
+        let array = output.into_array(None, &data_type).unwrap();
+        assert_eq!(array.data_type(), &data_type);
+
+        let array = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(strings.len(), data.len() * num_encodings);
+
+        // Should have a copy of `data` for each encoding
+        for i in 0..num_encodings {
+            assert_eq!(
+                strings
+                    .iter()
+                    .skip(i * data.len())
+                    .take(data.len())
+                    .map(|x| x.unwrap())
+                    .collect::<Vec<_>>(),
+                data
+            )
+        }
+    }
+
+    #[test]
+    fn test_too_large_dictionary() {
+        let data: Vec<_> = (0..128)
+            .map(|x| ByteArray::from(x.to_string().as_str()))
+            .collect();
+        let (dictionary, _) = encode_dictionary(&data);
+
+        let column_desc = utf8_column();
+
+        let mut decoder = DictionaryDecoder::<i8, i32>::new(&column_desc);
+        let err = decoder
+            .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false)
+            .unwrap_err()
+            .to_string();
+
+        assert!(err.contains("dictionary too large for index type"));
+
+        let mut decoder = DictionaryDecoder::<i16, i32>::new(&column_desc);
+        decoder
+            .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+    }
+
+    #[test]
+    fn test_nulls() {
+        let data_type = utf8_dictionary();
+        let (pages, encoded_dictionary) = 
byte_array_all_encodings(Vec::<&str>::new());
+
+        let column_desc = utf8_column();
+        let mut decoder = DictionaryDecoder::new(&column_desc);
+
+        decoder
+            .set_dict(encoded_dictionary, 4, Encoding::PLAIN_DICTIONARY, false)
+            .unwrap();
+
+        for (encoding, page) in pages {
+            let mut output = DictionaryBuffer::<i32, i32>::default();
+            decoder.set_data(encoding, page, 8, None).unwrap();
+            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+
+            output.pad_nulls(0, 0, 8, &[0]);
+            let array = output
+                .into_array(Some(Buffer::from(&[0])), &data_type)
+                .unwrap();
+
+            assert_eq!(array.len(), 8);
+            assert_eq!(array.null_count(), 8);
+        }
+    }
+}
diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs 
b/parquet/src/arrow/array_reader/dictionary_buffer.rs
new file mode 100644
index 0000000..6bb9603
--- /dev/null
+++ b/parquet/src/arrow/array_reader/dictionary_buffer.rs
@@ -0,0 +1,383 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary 
encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+    Dict {
+        keys: ScalarBuffer<K>,
+        values: ArrayRef,
+    },
+    Values {
+        values: OffsetBuffer<V>,
+    },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+    fn default() -> Self {
+        Self::Values {
+            values: Default::default(),
+        }
+    }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+    DictionaryBuffer<K, V>
+{
+    pub fn len(&self) -> usize {
+        match self {
+            Self::Dict { keys, .. } => keys.len(),
+            Self::Values { values } => values.len(),
+        }
+    }
+
+    /// Returns a mutable reference to a keys array
+    ///
+    /// Returns None if the dictionary needs to be recomputed
+    ///
+    /// # Panic
+    ///
+    /// Panics if the dictionary is too large for `K`
+    pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut 
ScalarBuffer<K>> {
+        assert!(K::from_usize(dictionary.len()).is_some());
+
+        match self {
+            Self::Dict { keys, values } => {
+                // Need to discard fat pointer for equality check
+                // - https://stackoverflow.com/a/67114787
+                // - https://github.com/rust-lang/rust/issues/46139
+                let values_ptr = values.as_ref() as *const _ as *const ();
+                let dict_ptr = dictionary.as_ref() as *const _ as *const ();
+                if values_ptr == dict_ptr {
+                    Some(keys)
+                } else if keys.is_empty() {
+                    *values = Arc::clone(dictionary);
+                    Some(keys)
+                } else {
+                    None
+                }
+            }
+            Self::Values { values } if values.is_empty() => {
+                *self = Self::Dict {
+                    keys: Default::default(),
+                    values: Arc::clone(dictionary),
+                };
+                match self {
+                    Self::Dict { keys, .. } => Some(keys),
+                    _ => unreachable!(),
+                }
+            }
+            _ => None,
+        }
+    }
+
+    /// Returns a mutable reference to a values array
+    ///
+    /// If this is currently dictionary encoded, this will convert from the
+    /// dictionary encoded representation
+    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+        match self {
+            Self::Values { values } => Ok(values),
+            Self::Dict { keys, values } => {
+                let mut spilled = OffsetBuffer::default();
+                let dict_buffers = values.data().buffers();
+                let dict_offsets = unsafe { dict_buffers[0].typed_data::<V>() 
};
+                let dict_values = dict_buffers[1].as_slice();
+
+                if values.is_empty() {
+                    // If dictionary is empty, zero pad offsets
+                    spilled.offsets.resize(keys.len() + 1);
+                } else {
+                    // Note: at this point null positions will have arbitrary 
dictionary keys
+                    // and this will hydrate them to the corresponding byte 
array. This is
+                    // likely sub-optimal, as we would prefer zero length null 
"slots", but
+                    // spilling is already a degenerate case and so it is 
unclear if this is
+                    // worth optimising for, e.g. by keeping a null mask around
+                    spilled.extend_from_dictionary(
+                        keys.as_slice(),
+                        dict_offsets,
+                        dict_values,
+                    )?;
+                }
+
+                *self = Self::Values { values: spilled };
+                match self {
+                    Self::Values { values } => Ok(values),
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and 
`null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: &ArrowType,
+    ) -> Result<ArrayRef> {
+        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+        match self {
+            Self::Dict { keys, values } => {
+                // Validate keys unless dictionary is empty
+                if !values.is_empty() {
+                    let min = K::from_usize(0).unwrap();
+                    let max = K::from_usize(values.len()).unwrap();
+
+                    // It may be possible to use SIMD here
+                    if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+                        return Err(general_err!(
+                            "dictionary key beyond bounds of dictionary: 
0..{}",
+                            values.len()
+                        ));
+                    }
+                }
+
+                let mut builder = ArrayDataBuilder::new(data_type.clone())
+                    .len(keys.len())
+                    .add_buffer(keys.into())
+                    .add_child_data(values.data().clone());
+
+                if let Some(buffer) = null_buffer {
+                    builder = builder.null_bit_buffer(buffer);
+                }
+
+                let data = match cfg!(debug_assertions) {
+                    true => builder.build().unwrap(),
+                    false => unsafe { builder.build_unchecked() },
+                };
+
+                Ok(make_array(data))
+            }
+            Self::Values { values } => {
+                let value_type = match data_type {
+                    ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+                    _ => unreachable!(),
+                };
+
+                // This will compute a new dictionary
+                let array = arrow::compute::cast(
+                    &values.into_array(null_buffer, value_type),
+                    data_type,
+                )
+                .expect("cast should be infallible");
+
+                Ok(array)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, 
V> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+    for DictionaryBuffer<K, V>
+{
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        match self {
+            Self::Dict { keys, .. } => {
+                keys.resize(read_offset + levels_read);
+                keys.pad_nulls(read_offset, values_read, levels_read, 
valid_mask)
+            }
+            Self::Values { values, .. } => {
+                values.pad_nulls(read_offset, values_read, levels_read, 
valid_mask)
+            }
+        }
+    }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+    for DictionaryBuffer<K, V>
+{
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        match self {
+            Self::Dict { keys, values } => Self::Dict {
+                keys: keys.take(len),
+                values: values.clone(),
+            },
+            Self::Values { values } => Self::Values {
+                values: values.split_off(len),
+            },
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        match self {
+            Self::Dict { keys, .. } => keys.set_len(len),
+            Self::Values { values } => values.set_len(len),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, StringArray};
+    use arrow::compute::cast;
+
+    #[test]
+    fn test_dictionary_buffer() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), 
Box::new(ArrowType::Utf8));
+
+        let d1: ArrayRef =
+            Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+        // Read some data preserving the dictionary
+        let values = &[1, 0, 3, 2, 4];
+        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+        let mut valid = vec![false, false, true, true, false, true, true, 
true];
+        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(0, values.len(), valid.len(), 
valid_buffer.as_slice());
+
+        // Split off some data
+
+        let split = buffer.split_off(4);
+        let null_buffer = Buffer::from_iter(valid.drain(0..4));
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![None, None, Some("world"), Some("hello")]
+        );
+
+        // Read some data not preserving the dictionary
+
+        let values = buffer.spill_values().unwrap();
+        let read_offset = values.len();
+        values.try_push("bingo".as_bytes(), false).unwrap();
+        values.try_push("bongo".as_bytes(), false).unwrap();
+
+        valid.extend_from_slice(&[false, false, true, false, true]);
+        let null_buffer = Buffer::from_iter(valid.iter().cloned());
+        buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
+
+        assert_eq!(buffer.len(), 9);
+        let split = buffer.split_off(9);
+
+        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![
+                None,
+                Some("a"),
+                Some(""),
+                Some("b"),
+                None,
+                None,
+                Some("bingo"),
+                None,
+                Some("bongo")
+            ]
+        );
+
+        // Can recreate with new dictionary as values is empty
+        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
+        buffer
+            .as_keys(&d2)
+            .unwrap()
+            .extend_from_slice(&[0, 1, 0, 1]);
+
+        let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+        assert_eq!(array.data_type(), &dict_type);
+
+        let strings = cast(&array, &ArrowType::Utf8).unwrap();
+        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().collect::<Vec<_>>(),
+            vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
+        );
+
+        // Can recreate with new dictionary as keys empty
+        assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+        assert_eq!(buffer.len(), 0);
+        let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
+        buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
+
+        // Cannot change dictionary as keys not empty
+        let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
+        assert!(buffer.as_keys(&d4).is_none());
+    }
+
+    #[test]
+    fn test_validates_keys() {
+        let dict_type =
+            ArrowType::Dictionary(Box::new(ArrowType::Int32), 
Box::new(ArrowType::Utf8));
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
+        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
+
+        let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
+        assert!(
+            err.contains("dictionary key beyond bounds of dictionary: 0..2"),
+            "{}",
+            err
+        );
+
+        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
+        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
+
+        let err = buffer.spill_values().unwrap_err().to_string();
+        assert!(
+            err.contains("dictionary key beyond bounds of dictionary: 0..1"),
+            "{}",
+            err
+        );
+    }
+}
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs 
b/parquet/src/arrow/array_reader/offset_buffer.rs
index ddf2ad3..dc35f95 100644
--- a/parquet/src/arrow/array_reader/offset_buffer.rs
+++ b/parquet/src/arrow/array_reader/offset_buffer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::bit_util::iter_set_bits_rev;
 use crate::arrow::record_reader::buffer::{
     BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
 };
@@ -26,6 +27,7 @@ use arrow::datatypes::{ArrowNativeType, DataType as 
ArrowType};
 
 /// A buffer of variable-sized byte arrays that can be converted into
 /// a corresponding [`ArrayRef`]
+#[derive(Debug)]
 pub struct OffsetBuffer<I: ScalarValue> {
     pub offsets: ScalarBuffer<I>,
     pub values: ScalarBuffer<u8>,
@@ -48,6 +50,10 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
         self.offsets.len() - 1
     }
 
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
     /// If `validate_utf8` this verifies that the first character of `data` is
     /// the start of a UTF-8 codepoint
     ///
@@ -80,6 +86,8 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
     ///
     /// For each value `key` in `keys` this will insert
     /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
+    ///
+    /// Note: This will validate offsets are valid
     pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
         &mut self,
         keys: &[K],
@@ -89,7 +97,10 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
         for key in keys {
             let index = key.to_usize().unwrap();
             if index + 1 >= dict_offsets.len() {
-                return Err(general_err!("invalid offset in byte array: {}", 
index));
+                return Err(general_err!(
+                    "dictionary key beyond bounds of dictionary: 0..{}",
+                    dict_offsets.len().saturating_sub(1)
+                ));
             }
             let start_offset = dict_offsets[index].to_usize().unwrap();
             let end_offset = dict_offsets[index + 1].to_usize().unwrap();
@@ -178,7 +189,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for 
OffsetBuffer<I> {
         read_offset: usize,
         values_read: usize,
         levels_read: usize,
-        rev_position_iter: impl Iterator<Item = usize>,
+        valid_mask: &[u8],
     ) {
         assert_eq!(self.offsets.len(), read_offset + values_read + 1);
         self.offsets.resize(read_offset + levels_read + 1);
@@ -189,7 +200,11 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for 
OffsetBuffer<I> {
         let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
 
         let values_range = read_offset..read_offset + values_read;
-        for (value_pos, level_pos) in 
values_range.clone().rev().zip(rev_position_iter) {
+        for (value_pos, level_pos) in values_range
+            .clone()
+            .rev()
+            .zip(iter_set_bits_rev(valid_mask))
+        {
             assert!(level_pos >= value_pos);
             assert!(level_pos < last_pos);
 
@@ -280,19 +295,36 @@ mod tests {
     #[test]
     fn test_offset_buffer_pad_nulls() {
         let mut buffer = OffsetBuffer::<i32>::default();
-        for v in ["a", "b", "c", "def", "gh"] {
+        let values = ["a", "b", "c", "def", "gh"];
+        for v in &values {
             buffer.try_push(v.as_bytes(), false).unwrap()
         }
 
+        let valid = vec![
+            true, false, false, true, false, true, false, true, true, false, 
false,
+        ];
+        let valid_mask = Buffer::from_iter(valid.iter().cloned());
+
         // Both trailing and leading nulls
-        buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter());
+        buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, 
valid_mask.as_slice());
 
-        // No null buffer - nulls -> ""
-        let array = buffer.into_array(None, ArrowType::Utf8);
+        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(
-            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
-            vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""]
+            strings.iter().collect::<Vec<_>>(),
+            vec![
+                Some("a"),
+                None,
+                None,
+                Some("b"),
+                None,
+                Some("c"),
+                None,
+                Some("def"),
+                Some("gh"),
+                None,
+                None
+            ]
         );
     }
 
@@ -335,4 +367,17 @@ mod tests {
         // Fails if run from middle of codepoint
         buffer.check_valid_utf8(12).unwrap_err();
     }
+
+    #[test]
+    fn test_pad_nulls_empty() {
+        let mut buffer = OffsetBuffer::<i32>::default();
+        let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
+        buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
+
+        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+
+        assert_eq!(strings.len(), 9);
+        assert!(strings.iter().all(|x| x.is_none()))
+    }
 }
diff --git a/parquet/src/arrow/array_reader/test_util.rs 
b/parquet/src/arrow/array_reader/test_util.rs
new file mode 100644
index 0000000..b04a597
--- /dev/null
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::basic::{ConvertedType, Encoding, Type as PhysicalType};
+use crate::data_type::{ByteArray, ByteArrayType};
+use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
+use crate::util::memory::{ByteBufferPtr, MemTracker};
+
+/// Returns a descriptor for a UTF-8 column
+pub fn utf8_column() -> ColumnDescPtr {
+    let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
+        .with_converted_type(ConvertedType::UTF8)
+        .build()
+        .unwrap();
+
+    Arc::new(ColumnDescriptor::new(
+        Arc::new(t),
+        1,
+        0,
+        ColumnPath::new(vec![]),
+    ))
+}
+
+/// Encode `data` with the provided `encoding`
+pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> 
ByteBufferPtr {
+    let descriptor = utf8_column();
+    let mem_tracker = Arc::new(MemTracker::new());
+    let mut encoder =
+        get_encoder::<ByteArrayType>(descriptor, encoding, 
mem_tracker).unwrap();
+
+    encoder.put(data).unwrap();
+    encoder.flush_buffer().unwrap()
+}
+
+/// Returns the encoded dictionary and value data
+pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) 
{
+    let mut dict_encoder =
+        DictEncoder::<ByteArrayType>::new(utf8_column(), 
Arc::new(MemTracker::new()));
+
+    dict_encoder.put(data).unwrap();
+    let encoded_rle = dict_encoder.flush_buffer().unwrap();
+    let encoded_dictionary = dict_encoder.write_dict().unwrap();
+
+    (encoded_dictionary, encoded_rle)
+}
+
+/// Encodes `data` in all the possible encodings
+///
+/// Returns an array of data with its associated encoding, along with an 
encoded dictionary
+pub fn byte_array_all_encodings(
+    data: Vec<impl Into<ByteArray>>,
+) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) {
+    let data: Vec<_> = data.into_iter().map(Into::into).collect();
+    let (encoded_dictionary, encoded_rle) = encode_dictionary(&data);
+
+    // A column chunk with all the encodings!
+    let pages = vec![
+        (Encoding::PLAIN, encode_byte_array(Encoding::PLAIN, &data)),
+        (
+            Encoding::DELTA_BYTE_ARRAY,
+            encode_byte_array(Encoding::DELTA_BYTE_ARRAY, &data),
+        ),
+        (
+            Encoding::DELTA_LENGTH_BYTE_ARRAY,
+            encode_byte_array(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
+        ),
+        (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
+        (Encoding::RLE_DICTIONARY, encoded_rle),
+    ];
+
+    (pages, encoded_dictionary)
+}
diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index 476cf08..259a3c0 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -17,6 +17,13 @@
 
 //! Contains reader which reads parquet data into arrow array.
 
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::{array::StructArray, error::ArrowError};
+
 use crate::arrow::array_reader::{build_array_reader, ArrayReader, 
StructArrayReader};
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::schema::{
@@ -25,11 +32,6 @@ use crate::arrow::schema::{
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::FileReader;
-use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
-use arrow::{array::StructArray, error::ArrowError};
-use std::sync::Arc;
 
 /// Arrow reader api.
 /// With this api, user can get arrow schema from parquet file, and read 
parquet data
@@ -233,13 +235,29 @@ impl ParquetRecordBatchReader {
 
 #[cfg(test)]
 mod tests {
+    use std::cmp::min;
+    use std::convert::TryFrom;
+    use std::fs::File;
+    use std::io::Seek;
+    use std::path::PathBuf;
+    use std::sync::Arc;
+
+    use rand::{thread_rng, RngCore};
+    use serde_json::json;
+    use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
+
+    use arrow::array::*;
+    use arrow::datatypes::{DataType as ArrowDataType, Field};
+    use arrow::error::Result as ArrowResult;
+    use arrow::record_batch::{RecordBatch, RecordBatchReader};
+
     use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
     use crate::arrow::converter::{
         BinaryArrayConverter, Converter, FixedSizeArrayConverter, 
FromConverter,
         IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, 
Utf8ArrayConverter,
     };
     use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
-    use crate::basic::{ConvertedType, Encoding, Repetition};
+    use crate::basic::{ConvertedType, Encoding, Repetition, Type as 
PhysicalType};
     use crate::column::writer::get_typed_column_writer_mut;
     use crate::data_type::{
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
@@ -253,18 +271,6 @@ mod tests {
     use crate::schema::types::{Type, TypePtr};
     use crate::util::cursor::SliceableCursor;
     use crate::util::test_common::RandGen;
-    use arrow::array::*;
-    use arrow::datatypes::{DataType as ArrowDataType, Field};
-    use arrow::record_batch::RecordBatchReader;
-    use rand::{thread_rng, RngCore};
-    use serde_json::json;
-    use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
-    use std::cmp::min;
-    use std::convert::TryFrom;
-    use std::fs::File;
-    use std::io::Seek;
-    use std::path::PathBuf;
-    use std::sync::Arc;
 
     #[test]
     fn test_arrow_reader_all_columns() {
@@ -423,13 +429,13 @@ mod tests {
             RandUtf8Gen,
         >(2, ConvertedType::NONE, None, &converter, encodings);
 
-        let converter = Utf8ArrayConverter {};
+        let utf8_converter = Utf8ArrayConverter {};
         run_single_column_reader_tests::<
             ByteArrayType,
             StringArray,
             Utf8ArrayConverter,
             RandUtf8Gen,
-        >(2, ConvertedType::UTF8, None, &converter, encodings);
+        >(2, ConvertedType::UTF8, None, &utf8_converter, encodings);
 
         run_single_column_reader_tests::<
             ByteArrayType,
@@ -440,27 +446,11 @@ mod tests {
             2,
             ConvertedType::UTF8,
             Some(ArrowDataType::Utf8),
-            &converter,
-            encodings,
-        );
-
-        run_single_column_reader_tests::<
-            ByteArrayType,
-            StringArray,
-            Utf8ArrayConverter,
-            RandUtf8Gen,
-        >(
-            2,
-            ConvertedType::UTF8,
-            Some(ArrowDataType::Dictionary(
-                Box::new(ArrowDataType::Int32),
-                Box::new(ArrowDataType::Utf8),
-            )),
-            &converter,
+            &utf8_converter,
             encodings,
         );
 
-        let converter = LargeUtf8ArrayConverter {};
+        let large_utf8_converter = LargeUtf8ArrayConverter {};
         run_single_column_reader_tests::<
             ByteArrayType,
             LargeStringArray,
@@ -470,9 +460,78 @@ mod tests {
             2,
             ConvertedType::UTF8,
             Some(ArrowDataType::LargeUtf8),
-            &converter,
+            &large_utf8_converter,
             encodings,
         );
+
+        let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
+        for key in &small_key_types {
+            for encoding in encodings {
+                let mut opts = TestOptions::new(2, 20, 
15).with_null_percent(50);
+                opts.encoding = *encoding;
+
+                // Cannot run full test suite as keys overflow, run small test 
instead
+                single_column_reader_test::<
+                    ByteArrayType,
+                    StringArray,
+                    Utf8ArrayConverter,
+                    RandUtf8Gen,
+                >(
+                    opts,
+                    2,
+                    ConvertedType::UTF8,
+                    Some(ArrowDataType::Dictionary(
+                        Box::new(key.clone()),
+                        Box::new(ArrowDataType::Utf8),
+                    )),
+                    &utf8_converter,
+                );
+            }
+        }
+
+        let key_types = [
+            ArrowDataType::Int16,
+            ArrowDataType::UInt16,
+            ArrowDataType::Int32,
+            ArrowDataType::UInt32,
+            ArrowDataType::Int64,
+            ArrowDataType::UInt64,
+        ];
+
+        for key in &key_types {
+            run_single_column_reader_tests::<
+                ByteArrayType,
+                StringArray,
+                Utf8ArrayConverter,
+                RandUtf8Gen,
+            >(
+                2,
+                ConvertedType::UTF8,
+                Some(ArrowDataType::Dictionary(
+                    Box::new(key.clone()),
+                    Box::new(ArrowDataType::Utf8),
+                )),
+                &utf8_converter,
+                encodings,
+            );
+
+            // https://github.com/apache/arrow-rs/issues/1179
+            // run_single_column_reader_tests::<
+            //     ByteArrayType,
+            //     LargeStringArray,
+            //     LargeUtf8ArrayConverter,
+            //     RandUtf8Gen,
+            // >(
+            //     2,
+            //     ConvertedType::UTF8,
+            //     Some(ArrowDataType::Dictionary(
+            //         Box::new(key.clone()),
+            //         Box::new(ArrowDataType::LargeUtf8),
+            //     )),
+            //     &large_utf8_converter,
+            //     encodings
+            // );
+        }
     }
 
     #[test]
@@ -519,6 +578,11 @@ mod tests {
         record_batch_size: usize,
         /// Percentage of nulls in column or None if required
         null_percent: Option<usize>,
+        /// Set write batch size
+        ///
+        /// This is the number of rows that are written at once to a page and
+        /// therefore acts as a bound on the page granularity of a row group
+        write_batch_size: usize,
         /// Maximum size of page in bytes
         max_data_page_size: usize,
         /// Maximum size of dictionary page in bytes
@@ -536,6 +600,7 @@ mod tests {
                 num_rows: 100,
                 record_batch_size: 15,
                 null_percent: None,
+                write_batch_size: 64,
                 max_data_page_size: 1024 * 1024,
                 max_dict_page_size: 1024 * 1024,
                 writer_version: WriterVersion::PARQUET_1_0,
@@ -578,6 +643,7 @@ mod tests {
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
+                .set_write_batch_size(self.write_batch_size)
                 .set_writer_version(self.writer_version);
 
             let builder = match self.encoding {
@@ -792,7 +858,7 @@ mod tests {
                     }
                 }
                 assert_eq!(a.data_type(), b.data_type());
-                assert_eq!(a.data(), b.data());
+                assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), 
b.data());
 
                 total_read = end;
             } else {
@@ -1005,4 +1071,101 @@ mod tests {
             error
         );
     }
+
+    #[test]
+    fn test_dictionary_preservation() {
+        let mut fields = vec![Arc::new(
+            Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
+                .with_repetition(Repetition::OPTIONAL)
+                .with_converted_type(ConvertedType::UTF8)
+                .build()
+                .unwrap(),
+        )];
+
+        let schema = Arc::new(
+            Type::group_type_builder("test_schema")
+                .with_fields(&mut fields)
+                .build()
+                .unwrap(),
+        );
+
+        let dict_type = ArrowDataType::Dictionary(
+            Box::new(ArrowDataType::Int32),
+            Box::new(ArrowDataType::Utf8),
+        );
+
+        let arrow_field = Field::new("leaf", dict_type, true);
+
+        let mut file = tempfile::tempfile().unwrap();
+
+        let values = vec![
+            vec![
+                ByteArray::from("hello"),
+                ByteArray::from("a"),
+                ByteArray::from("b"),
+                ByteArray::from("d"),
+            ],
+            vec![
+                ByteArray::from("c"),
+                ByteArray::from("a"),
+                ByteArray::from("b"),
+            ],
+        ];
+
+        let def_levels = vec![
+            vec![1, 0, 0, 1, 0, 0, 1, 1],
+            vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
+        ];
+
+        let opts = TestOptions {
+            encoding: Encoding::RLE_DICTIONARY,
+            ..Default::default()
+        };
+
+        generate_single_column_file_with_data::<ByteArrayType>(
+            &values,
+            Some(&def_levels),
+            file.try_clone().unwrap(), // Cannot use &mut File (#1163)
+            schema,
+            Some(arrow_field),
+            &opts,
+        )
+        .unwrap();
+
+        file.rewind().unwrap();
+
+        let parquet_reader = SerializedFileReader::try_from(file).unwrap();
+        let mut arrow_reader = 
ParquetFileArrowReader::new(Arc::new(parquet_reader));
+
+        let record_reader = arrow_reader.get_record_reader(3).unwrap();
+
+        let batches = record_reader
+            .collect::<ArrowResult<Vec<RecordBatch>>>()
+            .unwrap();
+
+        assert_eq!(batches.len(), 6);
+        assert!(batches.iter().all(|x| x.num_columns() == 1));
+
+        let row_counts = batches
+            .iter()
+            .map(|x| (x.num_rows(), x.column(0).null_count()))
+            .collect::<Vec<_>>();
+
+        assert_eq!(
+            row_counts,
+            vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
+        );
+
+        let get_dict =
+            |batch: &RecordBatch| 
batch.column(0).data().child_data()[0].clone();
+
+        // First and second batch in same row group -> same dictionary
+        assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
+        // Third batch spans row group -> computed dictionary
+        assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
+        assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
+        // Fourth, fifth and sixth from same row group -> same dictionary
+        assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
+        assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
+    }
 }
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs
new file mode 100644
index 0000000..881c67d
--- /dev/null
+++ b/parquet/src/arrow/bit_util.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::bit_chunk_iterator::BitChunks;
+use std::ops::Range;
+
+/// Counts the number of set bits in the provided range
+pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
+    let mut count = 0_usize;
+    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
+    chunks.iter().for_each(|chunk| {
+        count += chunk.count_ones() as usize;
+    });
+    count += chunks.remainder_bits().count_ones() as usize;
+    count
+}
+
+/// Iterates through the set bit positions in `bytes` in reverse order
+pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
+    let (mut byte_idx, mut in_progress) = match bytes.len() {
+        0 => (0, 0),
+        len => (len - 1, bytes[len - 1]),
+    };
+
+    std::iter::from_fn(move || loop {
+        if in_progress != 0 {
+            let bit_pos = 7 - in_progress.leading_zeros();
+            in_progress ^= 1 << bit_pos;
+            return Some((byte_idx << 3) + (bit_pos as usize));
+        }
+
+        if byte_idx == 0 {
+            return None;
+        }
+
+        byte_idx -= 1;
+        in_progress = bytes[byte_idx];
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::BooleanBufferBuilder;
+    use rand::prelude::*;
+
+    #[test]
+    fn test_bit_fns() {
+        let mut rng = thread_rng();
+        let mask_length = rng.gen_range(1..20);
+        let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 
0))
+            .take(mask_length)
+            .collect();
+
+        let mut nulls = BooleanBufferBuilder::new(mask_length);
+        bools.iter().for_each(|b| nulls.append(*b));
+
+        let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
+        let expected: Vec<_> = bools
+            .iter()
+            .enumerate()
+            .rev()
+            .filter_map(|(x, y)| y.then(|| x))
+            .collect();
+        assert_eq!(actual, expected);
+
+        assert_eq!(iter_set_bits_rev(&[]).count(), 0);
+        assert_eq!(count_set_bits(&[], 0..0), 0);
+        assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
+
+        for _ in 0..20 {
+            let start = rng.gen_range(0..bools.len());
+            let end = rng.gen_range(start..bools.len());
+
+            let actual = count_set_bits(nulls.as_slice(), start..end);
+            let expected = bools[start..end].iter().filter(|x| **x).count();
+
+            assert_eq!(actual, expected);
+        }
+    }
+}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 57ad5b1..fbbf655 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -122,6 +122,7 @@ experimental_mod!(array_reader);
 experimental_mod!(arrow_array_reader);
 pub mod arrow_reader;
 pub mod arrow_writer;
+mod bit_util;
 experimental_mod!(converter);
 pub(in crate::arrow) mod levels;
 pub(in crate::arrow) mod record_reader;
diff --git a/parquet/src/arrow/record_reader.rs 
b/parquet/src/arrow/record_reader.rs
index ce77db9..31138c1 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -167,7 +167,30 @@ where
                 break;
             }
 
-            let batch_size = max(num_records - records_read, MIN_BATCH_SIZE);
+            // If repetition levels present, we don't know how much more to 
read
+            // in order to read the requested number of records, therefore 
read at least
+            // MIN_BATCH_SIZE, otherwise read **exactly** what was requested. 
This helps
+            // to avoid a degenerate case where the buffers are never fully 
drained.
+            //
+            // Consider the scenario where the user is requesting batches of 
MIN_BATCH_SIZE.
+            //
+            // When transitioning across a row group boundary, this will read 
some remainder
+            // from the row group `r`, before reading MIN_BATCH_SIZE from the 
next row group,
+            // leaving `MIN_BATCH_SIZE + r` in the buffer.
+            //
+            // The client will then only split off the `MIN_BATCH_SIZE` they 
actually wanted,
+            // leaving behind `r`. This will continue indefinitely.
+            //
+            // Aside from wasting cycles splitting and shuffling buffers 
unnecessarily, this
+            // prevents dictionary preservation from functioning correctly as 
the buffer
+            // will never be emptied, allowing a new dictionary to be 
registered.
+            //
+            // This degenerate case can still occur for repeated fields, but
+            // it is avoided for the more common case of a non-repeated field
+            let batch_size = match &self.rep_levels {
+                Some(_) => max(num_records - records_read, MIN_BATCH_SIZE),
+                None => num_records - records_read,
+            };
 
             // Try to more value from parquet pages
             let values_read = self.read_one_batch(batch_size)?;
@@ -268,7 +291,7 @@ where
                 self.values_written,
                 values_read,
                 levels_read,
-                def_levels.rev_valid_positions_iter(),
+                def_levels.nulls().as_slice(),
             );
         }
 
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 5c69dfa..3460d11 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,6 +17,7 @@
 
 use std::marker::PhantomData;
 
+use crate::arrow::bit_util::iter_set_bits_rev;
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow::datatypes::ToByteSlice;
 
@@ -75,13 +76,18 @@ pub trait BufferQueue: Sized {
 pub trait ScalarValue {}
 impl ScalarValue for bool {}
 impl ScalarValue for u8 {}
+impl ScalarValue for i8 {}
+impl ScalarValue for u16 {}
 impl ScalarValue for i16 {}
+impl ScalarValue for u32 {}
 impl ScalarValue for i32 {}
+impl ScalarValue for u64 {}
 impl ScalarValue for i64 {}
 impl ScalarValue for f32 {}
 impl ScalarValue for f64 {}
 
 /// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for 
storage
+#[derive(Debug)]
 pub struct ScalarBuffer<T: ScalarValue> {
     buffer: MutableBuffer,
 
@@ -224,22 +230,14 @@ pub trait ValuesBuffer: BufferQueue {
     /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding 
from
     /// - `values_read` - the number of values read
     /// - `levels_read` - the number of levels read
-    /// - `rev_valid_position_iter` - a reverse iterator of the valid level 
positions
-    ///
-    /// It is required that:
-    ///
-    /// - `rev_valid_position_iter` has at least `values_len` elements
-    /// - `rev_valid_position_iter` returns strictly monotonically decreasing 
values
-    /// - `rev_valid_position_iter` returns values in the range 
`read_offset..read_offset+levels_len`
-    ///
-    /// Implementations may panic or otherwise misbehave if this is not the 
case
+    /// - `valid_mask` - a packed mask of valid levels
     ///
     fn pad_nulls(
         &mut self,
         read_offset: usize,
         values_read: usize,
         levels_read: usize,
-        rev_valid_position_iter: impl Iterator<Item = usize>,
+        valid_mask: &[u8],
     );
 }
 
@@ -249,13 +247,15 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
         read_offset: usize,
         values_read: usize,
         levels_read: usize,
-        rev_valid_position_iter: impl Iterator<Item = usize>,
+        valid_mask: &[u8],
     ) {
         let slice = self.as_slice_mut();
         assert!(slice.len() >= read_offset + levels_read);
 
         let values_range = read_offset..read_offset + values_read;
-        for (value_pos, level_pos) in 
values_range.rev().zip(rev_valid_position_iter) {
+        for (value_pos, level_pos) in
+            values_range.rev().zip(iter_set_bits_rev(valid_mask))
+        {
             debug_assert!(level_pos >= value_pos);
             if level_pos <= value_pos {
                 break;
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index d53310e..bc0de14 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -20,8 +20,8 @@ use std::ops::Range;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
-use arrow::util::bit_chunk_iterator::BitChunks;
 
+use crate::arrow::bit_util::count_set_bits;
 use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::basic::Encoding;
 use crate::column::reader::decoder::{
@@ -126,12 +126,7 @@ impl DefinitionLevelBuffer {
         Bitmap::from(std::mem::replace(old_builder, new_builder).finish())
     }
 
-    /// Returns an iterator of the valid positions in descending order
-    pub fn rev_valid_positions_iter(&self) -> impl Iterator<Item = usize> + '_ 
{
-        iter_set_bits_rev(self.nulls().as_slice())
-    }
-
-    fn nulls(&self) -> &BooleanBufferBuilder {
+    pub fn nulls(&self) -> &BooleanBufferBuilder {
         match &self.inner {
             BufferInner::Full { nulls, .. } => nulls,
             BufferInner::Mask { nulls } => nulls,
@@ -351,39 +346,6 @@ impl PackedDecoder {
     }
 }
 
-/// Counts the number of set bits in the provided range
-pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
-    let mut count = 0_usize;
-    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
-    chunks.iter().for_each(|chunk| {
-        count += chunk.count_ones() as usize;
-    });
-    count += chunks.remainder_bits().count_ones() as usize;
-    count
-}
-
-fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
-    let (mut byte_idx, mut in_progress) = match bytes.len() {
-        0 => (0, 0),
-        len => (len - 1, bytes[len - 1]),
-    };
-
-    std::iter::from_fn(move || loop {
-        if in_progress != 0 {
-            let bit_pos = 7 - in_progress.leading_zeros();
-            in_progress ^= 1 << bit_pos;
-            return Some((byte_idx << 3) + (bit_pos as usize));
-        }
-
-        if byte_idx == 0 {
-            return None;
-        }
-
-        byte_idx -= 1;
-        in_progress = bytes[byte_idx];
-    })
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -392,7 +354,7 @@ mod tests {
     use crate::basic::Type as PhysicalType;
     use crate::encodings::rle::RleEncoder;
     use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
-    use rand::{thread_rng, Rng, RngCore};
+    use rand::{thread_rng, Rng};
 
     #[test]
     fn test_packed_decoder() {
@@ -428,41 +390,6 @@ mod tests {
     }
 
     #[test]
-    fn test_bit_fns() {
-        let mut rng = thread_rng();
-        let mask_length = rng.gen_range(1..20);
-        let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 
0))
-            .take(mask_length)
-            .collect();
-
-        let mut nulls = BooleanBufferBuilder::new(mask_length);
-        bools.iter().for_each(|b| nulls.append(*b));
-
-        let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
-        let expected: Vec<_> = bools
-            .iter()
-            .enumerate()
-            .rev()
-            .filter_map(|(x, y)| y.then(|| x))
-            .collect();
-        assert_eq!(actual, expected);
-
-        assert_eq!(iter_set_bits_rev(&[]).count(), 0);
-        assert_eq!(count_set_bits(&[], 0..0), 0);
-        assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
-
-        for _ in 0..20 {
-            let start = rng.gen_range(0..bools.len());
-            let end = rng.gen_range(start..bools.len());
-
-            let actual = count_set_bits(nulls.as_slice(), start..end);
-            let expected = bools[start..end].iter().filter(|x| **x).count();
-
-            assert_eq!(actual, expected);
-        }
-    }
-
-    #[test]
     fn test_split_off() {
         let t = Type::primitive_type_builder("col", PhysicalType::INT32)
             .build()
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 87b25b4..1db0ea0 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -567,6 +567,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
     /// Returns true if there is enough data for a data page, false otherwise.
     #[inline]
     fn should_add_data_page(&self) -> bool {
+        // This is necessary in the event of a much larger dictionary size 
than page size
+        //
+        // In such a scenario the dictionary decoder may return an estimated 
encoded
+        // size in excess of the page size limit, even when there are no 
buffered values
+        if self.num_buffered_values == 0 {
+            return false;
+        }
+
         match self.dict_encoder {
             Some(ref encoder) => {
                 encoder.estimated_data_encoded_size() >= 
self.props.data_pagesize_limit()

Reply via email to