This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc0c05   parquet: Optimized ByteArrayReader, Add UTF-8 Validation 
(#1040)  (#1082)
0cc0c05 is described below

commit 0cc0c055e0fb09553fa80186768f9b539a552419
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jan 18 12:13:21 2022 +0000

     parquet: Optimized ByteArrayReader, Add UTF-8 Validation (#1040)  (#1082)
    
    * Optimized ByteArrayReader (#1040)
    
    UTF-8 Validation (#786)
    
    * Fix arrow_array_reader benchmark
    
    * Allow running subset of arrow_array_reader benchmarks
    
    * Faster UTF-8 validation
    
    * Tweak null handling
    
    * Add license
    
    * Refine `ValuesBuffer::pad_nulls`
    
    * Tweak error handling
    
    * Use page null count if available
    
    * Doc comments
    
    * Test DELTA_BYTE_ARRAY encoding
    
    * Support legacy Encoding::PLAIN_DICTIONARY
    
    * Add OffsetBuffer unit tests
    
    Review feedback
    
    * More tests
    
    * Fix lint
    
    * Review feedback
---
 parquet/benches/arrow_array_reader.rs              | 146 +++--
 parquet/src/arrow/array_reader.rs                  |  92 ++-
 parquet/src/arrow/array_reader/byte_array.rs       | 699 +++++++++++++++++++++
 parquet/src/arrow/array_reader/offset_buffer.rs    | 338 ++++++++++
 parquet/src/arrow/arrow_reader.rs                  |  39 +-
 parquet/src/arrow/record_reader.rs                 |  10 +-
 parquet/src/arrow/record_reader/buffer.rs          |  81 ++-
 .../src/arrow/record_reader/definition_levels.rs   |   8 +-
 parquet/src/column/reader.rs                       |  30 +-
 parquet/src/column/reader/decoder.rs               |  18 +-
 10 files changed, 1299 insertions(+), 162 deletions(-)

diff --git a/parquet/benches/arrow_array_reader.rs 
b/parquet/benches/arrow_array_reader.rs
index 5587b52..54f99c4 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -47,6 +47,7 @@ const PAGES_PER_GROUP: usize = 2;
 const VALUES_PER_PAGE: usize = 10_000;
 const BATCH_SIZE: usize = 8192;
 
+use arrow::array::Array;
 use rand::{rngs::StdRng, Rng, SeedableRng};
 
 pub fn seedable_rng() -> StdRng {
@@ -273,7 +274,7 @@ fn build_dictionary_encoded_string_page_iterator(
     InMemoryPageIterator::new(schema, column_desc, pages)
 }
 
-fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+fn bench_array_reader(mut array_reader: Box<dyn ArrayReader>) -> usize {
     // test procedure: read data in batches of 8192 until no more data
     let mut total_count = 0;
     loop {
@@ -290,49 +291,46 @@ fn bench_array_reader(mut array_reader: impl ArrayReader) 
-> usize {
 fn create_int32_arrow_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
     use parquet::arrow::arrow_array_reader::{ArrowArrayReader, 
PrimitiveArrayConverter};
     let converter = 
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
-    ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap()
+    let reader =
+        ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap();
+    Box::new(reader)
 }
 
 fn create_int32_primitive_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
     use parquet::arrow::array_reader::PrimitiveArrayReader;
-    PrimitiveArrayReader::<Int32Type>::new_with_options(
+    let reader = PrimitiveArrayReader::<Int32Type>::new_with_options(
         Box::new(page_iterator),
         column_desc,
         None,
         true,
     )
-    .unwrap()
+    .unwrap();
+    Box::new(reader)
 }
 
 fn create_string_arrow_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
     use parquet::arrow::arrow_array_reader::{ArrowArrayReader, 
StringArrayConverter};
     let converter = StringArrayConverter::new();
-    ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap()
+    let reader =
+        ArrowArrayReader::try_new(page_iterator, column_desc, converter, 
None).unwrap();
+    Box::new(reader)
 }
 
-fn create_string_complex_array_reader(
+fn create_string_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
-    use parquet::arrow::array_reader::ComplexObjectArrayReader;
-    use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
-    let converter = Utf8Converter::new(Utf8ArrayConverter {});
-    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, 
Utf8Converter>::new(
-        Box::new(page_iterator),
-        column_desc,
-        converter,
-        None,
-    )
-    .unwrap()
+) -> Box<dyn ArrayReader> {
+    use parquet::arrow::array_reader::make_byte_array_reader;
+    make_byte_array_reader(Box::new(page_iterator), column_desc, None, 
true).unwrap()
 }
 
 fn add_benches(c: &mut Criterion) {
@@ -368,10 +366,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, plain encoded, mandatory, no NULLs - new",
@@ -382,10 +380,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
         schema.clone(),
@@ -401,10 +399,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, plain encoded, optional, no NULLs - new",
@@ -415,10 +413,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // int32, plain encoded, half NULLs
     let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
@@ -435,10 +433,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, plain encoded, optional, half NULLs - new",
@@ -449,10 +447,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // int32, dictionary encoded, no NULLs
     let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
@@ -469,10 +467,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, dictionary encoded, mandatory, no NULLs - new",
@@ -483,10 +481,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
         schema.clone(),
@@ -502,10 +500,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, dictionary encoded, optional, no NULLs - new",
@@ -516,10 +514,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // int32, dictionary encoded, half NULLs
     let dictionary_int32_half_null_data = 
build_dictionary_encoded_int32_page_iterator(
@@ -536,10 +534,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read Int32Array, dictionary encoded, optional, half NULLs - new",
@@ -550,10 +548,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_int32_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // string benchmarks
     //==============================
@@ -568,15 +566,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, plain encoded, mandatory, no NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     plain_string_no_null_data.clone(),
                     mandatory_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, plain encoded, mandatory, no NULLs - new",
@@ -587,10 +585,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
         schema.clone(),
@@ -601,15 +599,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, plain encoded, optional, no NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     plain_string_no_null_data.clone(),
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, plain encoded, optional, no NULLs - new",
@@ -620,10 +618,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // string, plain encoded, half NULLs
     let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
@@ -635,15 +633,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, plain encoded, optional, half NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     plain_string_half_null_data.clone(),
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, plain encoded, optional, half NULLs - new",
@@ -654,10 +652,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // string, dictionary encoded, no NULLs
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
@@ -669,15 +667,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, dictionary encoded, mandatory, no NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     dictionary_string_no_null_data.clone(),
                     mandatory_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, dictionary encoded, mandatory, no NULLs - new",
@@ -688,10 +686,10 @@ fn add_benches(c: &mut Criterion) {
                     mandatory_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
         schema.clone(),
@@ -702,15 +700,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, dictionary encoded, optional, no NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     dictionary_string_no_null_data.clone(),
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, dictionary encoded, optional, no NULLs - new",
@@ -721,10 +719,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     // string, dictionary encoded, half NULLs
     let dictionary_string_half_null_data = 
build_dictionary_encoded_string_page_iterator(
@@ -736,15 +734,15 @@ fn add_benches(c: &mut Criterion) {
         "read StringArray, dictionary encoded, optional, half NULLs - old",
         |b| {
             b.iter(|| {
-                let array_reader = create_string_complex_array_reader(
+                let array_reader = create_string_byte_array_reader(
                     dictionary_string_half_null_data.clone(),
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.bench_function(
         "read StringArray, dictionary encoded, optional, half NULLs - new",
@@ -755,10 +753,10 @@ fn add_benches(c: &mut Criterion) {
                     optional_string_column_desc.clone(),
                 );
                 count = bench_array_reader(array_reader);
-            })
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
         },
     );
-    assert_eq!(count, EXPECTED_VALUE_COUNT);
 
     group.finish();
 }
diff --git a/parquet/src/arrow/array_reader.rs 
b/parquet/src/arrow/array_reader.rs
index 537b4cc..c11bfc2 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -60,8 +60,7 @@ use crate::arrow::converter::{
     DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
     Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
     IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
-    IntervalYearMonthConverter, LargeBinaryArrayConverter, 
LargeBinaryConverter,
-    LargeUtf8ArrayConverter, LargeUtf8Converter,
+    IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
 };
 use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
 use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -81,6 +80,11 @@ use crate::schema::types::{
 };
 use crate::schema::visitor::TypeVisitor;
 
+mod byte_array;
+mod offset_buffer;
+
+pub use byte_array::make_byte_array_reader;
+
 /// Array reader reads parquet data into arrow array.
 pub trait ArrayReader {
     fn as_any(&self) -> &dyn Any;
@@ -1778,57 +1782,43 @@ impl<'a> ArrayReaderBuilder {
                     null_mask_only,
                 )?,
             )),
-            PhysicalType::BYTE_ARRAY => {
-                if cur_type.get_basic_info().converted_type() == 
ConvertedType::UTF8 {
-                    if let Some(ArrowType::LargeUtf8) = arrow_type {
-                        let converter =
-                            LargeUtf8Converter::new(LargeUtf8ArrayConverter 
{});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            LargeUtf8Converter,
-                        >::new(
-                            page_iterator,
-                            column_desc,
-                            converter,
-                            arrow_type,
-                        )?))
-                    } else {
-                        use crate::arrow::arrow_array_reader::{
-                            ArrowArrayReader, StringArrayConverter,
-                        };
-                        let converter = StringArrayConverter::new();
-                        Ok(Box::new(ArrowArrayReader::try_new(
-                            *page_iterator,
-                            column_desc,
-                            converter,
-                            arrow_type,
-                        )?))
+            PhysicalType::BYTE_ARRAY => match arrow_type {
+                // TODO: Replace with optimised dictionary reader (#171)
+                Some(ArrowType::Dictionary(_, _)) => {
+                    match cur_type.get_basic_info().converted_type() {
+                        ConvertedType::UTF8 => {
+                            let converter = 
Utf8Converter::new(Utf8ArrayConverter {});
+                            Ok(Box::new(ComplexObjectArrayReader::<
+                                ByteArrayType,
+                                Utf8Converter,
+                            >::new(
+                                page_iterator,
+                                column_desc,
+                                converter,
+                                arrow_type,
+                            )?))
+                        }
+                        _ => {
+                            let converter = 
BinaryConverter::new(BinaryArrayConverter {});
+                            Ok(Box::new(ComplexObjectArrayReader::<
+                                ByteArrayType,
+                                BinaryConverter,
+                            >::new(
+                                page_iterator,
+                                column_desc,
+                                converter,
+                                arrow_type,
+                            )?))
+                        }
                     }
-                } else if let Some(ArrowType::LargeBinary) = arrow_type {
-                    let converter =
-                        LargeBinaryConverter::new(LargeBinaryArrayConverter 
{});
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        ByteArrayType,
-                        LargeBinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
-                } else {
-                    let converter = BinaryConverter::new(BinaryArrayConverter 
{});
-                    Ok(Box::new(ComplexObjectArrayReader::<
-                        ByteArrayType,
-                        BinaryConverter,
-                    >::new(
-                        page_iterator,
-                        column_desc,
-                        converter,
-                        arrow_type,
-                    )?))
                 }
-            }
+                _ => make_byte_array_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    null_mask_only,
+                ),
+            },
             PhysicalType::FIXED_LEN_BYTE_ARRAY
                 if cur_type.get_basic_info().converted_type()
                     == ConvertedType::DECIMAL =>
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
new file mode 100644
index 0000000..fc214dd
--- /dev/null
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -0,0 +1,699 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::ScalarValue;
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::data_type::Int32Type;
+use crate::encodings::{
+    decoding::{Decoder, DeltaBitPackDecoder},
+    rle::RleDecoder,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
+use arrow::array::{ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::ops::Range;
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+pub fn make_byte_array_reader(
+    pages: Box<dyn PageIterator>,
+    column_desc: ColumnDescPtr,
+    arrow_type: Option<ArrowType>,
+    null_mask_only: bool,
+) -> Result<Box<dyn ArrayReader>> {
+    // Check if Arrow type is specified, else create it from Parquet type
+    let data_type = match arrow_type {
+        Some(t) => t,
+        None => parquet_to_arrow_field(column_desc.as_ref())?
+            .data_type()
+            .clone(),
+    };
+
+    match data_type {
+        ArrowType::Binary | ArrowType::Utf8 => {
+            let reader =
+                GenericRecordReader::new_with_options(column_desc, 
null_mask_only);
+            Ok(Box::new(ByteArrayReader::<i32>::new(
+                pages, data_type, reader,
+            )))
+        }
+        ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
+            let reader =
+                GenericRecordReader::new_with_options(column_desc, 
null_mask_only);
+            Ok(Box::new(ByteArrayReader::<i64>::new(
+                pages, data_type, reader,
+            )))
+        }
+        _ => Err(general_err!(
+            "invalid data type for byte array reader - {}",
+            data_type
+        )),
+    }
+}
+
+/// An [`ArrayReader`] for variable length byte arrays
+struct ByteArrayReader<I: ScalarValue> {
+    data_type: ArrowType,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    record_reader: GenericRecordReader<OffsetBuffer<I>, 
ByteArrayColumnValueDecoder<I>>,
+}
+
+impl<I: ScalarValue> ByteArrayReader<I> {
+    fn new(
+        pages: Box<dyn PageIterator>,
+        data_type: ArrowType,
+        record_reader: GenericRecordReader<
+            OffsetBuffer<I>,
+            ByteArrayColumnValueDecoder<I>,
+        >,
+    ) -> Self {
+        Self {
+            data_type,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            record_reader,
+        }
+    }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
+        let buffer = self.record_reader.consume_record_data()?;
+        let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.record_reader.reset();
+
+        Ok(buffer.into_array(null_buffer, self.data_type.clone()))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer
+            .as_ref()
+            .map(|buf| unsafe { buf.typed_data() })
+    }
+}
+
+/// A [`ColumnValueDecoder`] for variable length byte arrays
+struct ByteArrayColumnValueDecoder<I: ScalarValue> {
+    dict: Option<OffsetBuffer<I>>,
+    decoder: Option<ByteArrayDecoder>,
+    validate_utf8: bool,
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
+    for ByteArrayColumnValueDecoder<I>
+{
+    type Slice = OffsetBuffer<I>;
+
+    fn new(desc: &ColumnDescPtr) -> Self {
+        let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
+        Self {
+            dict: None,
+            decoder: None,
+            validate_utf8,
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if !matches!(
+            encoding,
+            Encoding::PLAIN | Encoding::RLE_DICTIONARY | 
Encoding::PLAIN_DICTIONARY
+        ) {
+            return Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ));
+        }
+
+        let mut buffer = OffsetBuffer::default();
+        let mut decoder = ByteArrayDecoderPlain::new(
+            buf,
+            num_values as usize,
+            Some(num_values as usize),
+            self.validate_utf8,
+        );
+        decoder.read(&mut buffer, usize::MAX)?;
+        self.dict = Some(buffer);
+        Ok(())
+    }
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+    ) -> Result<()> {
+        self.decoder = Some(ByteArrayDecoder::new(
+            encoding,
+            data,
+            num_levels,
+            num_values,
+            self.validate_utf8,
+        )?);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+        self.decoder.as_mut().expect("decoder set").read(
+            out,
+            range.end - range.start,
+            self.dict.as_ref(),
+        )
+    }
+}
+
+/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
+pub enum ByteArrayDecoder {
+    Plain(ByteArrayDecoderPlain),
+    Dictionary(ByteArrayDecoderDictionary),
+    DeltaLength(ByteArrayDecoderDeltaLength),
+    DeltaByteArray(ByteArrayDecoderDelta),
+}
+
+impl ByteArrayDecoder {
+    pub fn new(
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+        validate_utf8: bool,
+    ) -> Result<Self> {
+        let decoder = match encoding {
+            Encoding::PLAIN => 
ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new(
+                data,
+                num_levels,
+                num_values,
+                validate_utf8,
+            )),
+            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
+                ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
+                    data, num_levels, num_values,
+                ))
+            }
+            Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
+                ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
+            ),
+            Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
+                ByteArrayDecoderDelta::new(data, validate_utf8)?,
+            ),
+            _ => {
+                return Err(general_err!(
+                    "unsupported encoding for byte array: {}",
+                    encoding
+                ))
+            }
+        };
+
+        Ok(decoder)
+    }
+
+    /// Read up to `len` values to `out` with the optional dictionary
+    pub fn read<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        out: &mut OffsetBuffer<I>,
+        len: usize,
+        dict: Option<&OffsetBuffer<I>>,
+    ) -> Result<usize> {
+        match self {
+            ByteArrayDecoder::Plain(d) => d.read(out, len),
+            ByteArrayDecoder::Dictionary(d) => {
+                let dict = dict.expect("dictionary set");
+                d.read(out, dict, len)
+            }
+            ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
+            ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
+        }
+    }
+}
+
+/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
+pub struct ByteArrayDecoderPlain {
+    buf: ByteBufferPtr,
+    offset: usize,
+    validate_utf8: bool,
+
+    /// This is a maximum as the null count is not always known, e.g. value 
data from
+    /// a v1 data page
+    max_remaining_values: usize,
+}
+
+impl ByteArrayDecoderPlain {
+    pub fn new(
+        buf: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+        validate_utf8: bool,
+    ) -> Self {
+        Self {
+            buf,
+            validate_utf8,
+            offset: 0,
+            max_remaining_values: num_values.unwrap_or(num_levels),
+        }
+    }
+
+    pub fn read<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        output: &mut OffsetBuffer<I>,
+        len: usize,
+    ) -> Result<usize> {
+        let initial_values_length = output.values.len();
+
+        let to_read = len.min(self.max_remaining_values);
+        output.offsets.reserve(to_read);
+
+        let remaining_bytes = self.buf.len() - self.offset;
+        if remaining_bytes == 0 {
+            return Ok(0);
+        }
+
+        let estimated_bytes = remaining_bytes
+            .checked_mul(to_read)
+            .map(|x| x / self.max_remaining_values)
+            .unwrap_or_default();
+
+        output.values.reserve(estimated_bytes);
+
+        let mut read = 0;
+
+        let buf = self.buf.as_ref();
+        while self.offset < self.buf.len() && read != to_read {
+            if self.offset + 4 > buf.len() {
+                return Err(ParquetError::EOF("eof decoding byte 
array".into()));
+            }
+            let len_bytes: [u8; 4] =
+                buf[self.offset..self.offset + 4].try_into().unwrap();
+            let len = u32::from_le_bytes(len_bytes);
+
+            let start_offset = self.offset + 4;
+            let end_offset = start_offset + len as usize;
+            if end_offset > buf.len() {
+                return Err(ParquetError::EOF("eof decoding byte 
array".into()));
+            }
+
+            output.try_push(&buf[start_offset..end_offset], 
self.validate_utf8)?;
+
+            self.offset = end_offset;
+            read += 1;
+        }
+        self.max_remaining_values -= to_read;
+
+        if self.validate_utf8 {
+            output.check_valid_utf8(initial_values_length)?;
+        }
+        Ok(to_read)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: ByteBufferPtr,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteArrayDecoderDeltaLength {
+    fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.all(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        output: &mut OffsetBuffer<I>,
+        len: usize,
+    ) -> Result<usize> {
+        let initial_values_length = output.values.len();
+
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.offsets.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+        output.values.reserve(total_bytes);
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let mut start_offset = self.data_offset;
+        for length in src_lengths {
+            let end_offset = start_offset + *length as usize;
+            output.try_push(
+                &self.data.as_ref()[start_offset..end_offset],
+                self.validate_utf8,
+            )?;
+            start_offset = end_offset;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        if self.validate_utf8 {
+            output.check_valid_utf8(initial_values_length)?;
+        }
+        Ok(to_read)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDelta {
+    prefix_lengths: Vec<i32>,
+    suffix_lengths: Vec<i32>,
+    data: ByteBufferPtr,
+    length_offset: usize,
+    data_offset: usize,
+    last_value: Vec<u8>,
+    validate_utf8: bool,
+}
+
+impl ByteArrayDecoderDelta {
+    fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
+        let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
+        prefix.set_data(data.all(), 0)?;
+
+        let num_prefix = prefix.values_left();
+        let mut prefix_lengths = vec![0; num_prefix];
+        assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
+
+        let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
+        suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
+
+        let num_suffix = suffix.values_left();
+        let mut suffix_lengths = vec![0; num_suffix];
+        assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
+
+        if num_prefix != num_suffix {
+            return Err(general_err!(format!(
+                "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, 
suffixes: {}",
+                num_prefix, num_suffix
+            )));
+        }
+
+        Ok(Self {
+            prefix_lengths,
+            suffix_lengths,
+            data,
+            length_offset: 0,
+            data_offset: prefix.get_offset() + suffix.get_offset(),
+            last_value: vec![],
+            validate_utf8,
+        })
+    }
+
+    fn read<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        output: &mut OffsetBuffer<I>,
+        len: usize,
+    ) -> Result<usize> {
+        let initial_values_length = output.values.len();
+        assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());
+
+        let to_read = len.min(self.prefix_lengths.len() - self.length_offset);
+
+        output.offsets.reserve(to_read);
+
+        let length_range = self.length_offset..self.length_offset + to_read;
+        let iter = self.prefix_lengths[length_range.clone()]
+            .iter()
+            .zip(&self.suffix_lengths[length_range]);
+
+        let data = self.data.as_ref();
+
+        for (prefix_length, suffix_length) in iter {
+            let prefix_length = *prefix_length as usize;
+            let suffix_length = *suffix_length as usize;
+
+            if self.data_offset + suffix_length > self.data.len() {
+                return Err(ParquetError::EOF("eof decoding byte 
array".into()));
+            }
+
+            self.last_value.truncate(prefix_length);
+            self.last_value.extend_from_slice(
+                &data[self.data_offset..self.data_offset + suffix_length],
+            );
+            output.try_push(&self.last_value, self.validate_utf8)?;
+
+            self.data_offset += suffix_length;
+        }
+
+        self.length_offset += to_read;
+
+        if self.validate_utf8 {
+            output.check_valid_utf8(initial_values_length)?;
+        }
+        Ok(to_read)
+    }
+}
+
+/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDictionary {
+    decoder: RleDecoder,
+
+    index_buf: Box<[i32; 1024]>,
+    index_buf_len: usize,
+    index_offset: usize,
+
+    /// This is a maximum as the null count is not always known, e.g. value 
data from
+    /// a v1 data page
+    max_remaining_values: usize,
+}
+
+impl ByteArrayDecoderDictionary {
+    fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) 
-> Self {
+        let bit_width = data[0];
+        let mut decoder = RleDecoder::new(bit_width);
+        decoder.set_data(data.start_from(1));
+
+        Self {
+            decoder,
+            index_buf: Box::new([0; 1024]),
+            index_buf_len: 0,
+            index_offset: 0,
+            max_remaining_values: num_values.unwrap_or(num_levels),
+        }
+    }
+
+    fn read<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        output: &mut OffsetBuffer<I>,
+        dict: &OffsetBuffer<I>,
+        len: usize,
+    ) -> Result<usize> {
+        let mut values_read = 0;
+
+        while values_read != len && self.max_remaining_values != 0 {
+            if self.index_offset == self.index_buf_len {
+                let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+                if read == 0 {
+                    break;
+                }
+                self.index_buf_len = read;
+                self.index_offset = 0;
+            }
+
+            let to_read = (len - values_read)
+                .min(self.index_buf_len - self.index_offset)
+                .min(self.max_remaining_values);
+
+            output.extend_from_dictionary(
+                &self.index_buf[self.index_offset..self.index_offset + 
to_read],
+                dict.offsets.as_slice(),
+                dict.values.as_slice(),
+            )?;
+
+            self.index_offset += to_read;
+            values_read += to_read;
+            self.max_remaining_values -= to_read;
+        }
+        Ok(values_read)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::record_reader::buffer::ValuesBuffer;
+    use crate::basic::Type as PhysicalType;
+    use crate::data_type::{ByteArray, ByteArrayType};
+    use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
+    use crate::util::memory::MemTracker;
+    use arrow::array::{Array, StringArray};
+    use std::sync::Arc;
+
+    fn column() -> ColumnDescPtr {
+        let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
+            .with_converted_type(ConvertedType::UTF8)
+            .build()
+            .unwrap();
+
+        Arc::new(ColumnDescriptor::new(
+            Arc::new(t),
+            1,
+            0,
+            ColumnPath::new(vec![]),
+        ))
+    }
+
+    fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
+        let descriptor = column();
+        let mem_tracker = Arc::new(MemTracker::new());
+        let mut encoder =
+            get_encoder::<ByteArrayType>(descriptor, encoding, 
mem_tracker).unwrap();
+
+        encoder.put(data).unwrap();
+        encoder.flush_buffer().unwrap()
+    }
+
+    #[test]
+    fn test_byte_array_decoder() {
+        let data: Vec<_> = vec!["hello", "world", "a", "b"]
+            .into_iter()
+            .map(ByteArray::from)
+            .collect();
+
+        let mut dict_encoder =
+            DictEncoder::<ByteArrayType>::new(column(), 
Arc::new(MemTracker::new()));
+
+        dict_encoder.put(&data).unwrap();
+        let encoded_rle = dict_encoder.flush_buffer().unwrap();
+        let encoded_dictionary = dict_encoder.write_dict().unwrap();
+
+        // A column chunk with all the encodings!
+        let pages = vec![
+            (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
+            (
+                Encoding::DELTA_BYTE_ARRAY,
+                get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
+            ),
+            (
+                Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
+            ),
+            (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
+            (Encoding::RLE_DICTIONARY, encoded_rle),
+        ];
+
+        let column_desc = column();
+        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+        decoder
+            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+
+        for (encoding, page) in pages {
+            let mut output = OffsetBuffer::<i32>::default();
+            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+
+            assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+
+            assert_eq!(output.values.as_slice(), "hello".as_bytes());
+            assert_eq!(output.offsets.as_slice(), &[0, 5]);
+
+            assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+            assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
+            assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
+
+            assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+            assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
+            assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
+
+            assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+
+            let valid = vec![false, false, true, true, false, true, true, 
false, false];
+            let rev_position_iter = valid
+                .iter()
+                .enumerate()
+                .rev()
+                .filter_map(|(i, valid)| valid.then(|| i));
+
+            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+
+            output.pad_nulls(0, 4, valid.len(), rev_position_iter);
+            let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
+            let strings = 
array.as_any().downcast_ref::<StringArray>().unwrap();
+
+            assert_eq!(
+                strings.iter().collect::<Vec<_>>(),
+                vec![
+                    None,
+                    None,
+                    Some("hello"),
+                    Some("world"),
+                    None,
+                    Some("a"),
+                    Some("b"),
+                    None,
+                    None,
+                ]
+            );
+        }
+    }
+}
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs 
b/parquet/src/arrow/array_reader/offset_buffer.rs
new file mode 100644
index 0000000..ddf2ad3
--- /dev/null
+++ b/parquet/src/arrow/array_reader/offset_buffer.rs
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::record_reader::buffer::{
+    BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+/// A buffer of variable-sized byte arrays that can be converted into
+/// a corresponding [`ArrayRef`]
+pub struct OffsetBuffer<I: ScalarValue> {
+    pub offsets: ScalarBuffer<I>,
+    pub values: ScalarBuffer<u8>,
+}
+
+impl<I: ScalarValue> Default for OffsetBuffer<I> {
+    fn default() -> Self {
+        let mut offsets = ScalarBuffer::new();
+        offsets.resize(1);
+        Self {
+            offsets,
+            values: ScalarBuffer::new(),
+        }
+    }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
+    /// Returns the number of byte arrays in this buffer
+    pub fn len(&self) -> usize {
+        self.offsets.len() - 1
+    }
+
+    /// If `validate_utf8` this verifies that the first character of `data` is
+    /// the start of a UTF-8 codepoint
+    ///
+    /// Note: This does not verify that the entirety of `data` is valid
+    /// UTF-8. This should be done by calling [`Self::values_as_str`] after
+    /// all data has been written
+    pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> 
{
+        if validate_utf8 {
+            if let Some(&b) = data.first() {
+                // A valid code-point iff it does not start with 0b10xxxxxx
+                // Bit-magic taken from `std::str::is_char_boundary`
+                if (b as i8) < -0x40 {
+                    return Err(ParquetError::General(
+                        "encountered non UTF-8 data".to_string(),
+                    ));
+                }
+            }
+        }
+
+        self.values.extend_from_slice(data);
+
+        let index_offset = I::from_usize(self.values.len())
+            .ok_or_else(|| general_err!("index overflow decoding byte 
array"))?;
+
+        self.offsets.push(index_offset);
+        Ok(())
+    }
+
+    /// Extends this buffer with a list of keys
+    ///
+    /// For each value `key` in `keys` this will insert
+    /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
+    pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
+        &mut self,
+        keys: &[K],
+        dict_offsets: &[V],
+        dict_values: &[u8],
+    ) -> Result<()> {
+        for key in keys {
+            let index = key.to_usize().unwrap();
+            if index + 1 >= dict_offsets.len() {
+                return Err(general_err!("invalid offset in byte array: {}", 
index));
+            }
+            let start_offset = dict_offsets[index].to_usize().unwrap();
+            let end_offset = dict_offsets[index + 1].to_usize().unwrap();
+
+            // Dictionary values are verified when decoding dictionary page
+            self.try_push(&dict_values[start_offset..end_offset], false)?;
+        }
+        Ok(())
+    }
+
+    /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
+    ///
+    /// This MUST be combined with validating that the offsets start on a 
character
+    /// boundary, otherwise it would be possible for the values array to be a 
valid UTF-8
+    /// sequence, but not the individual string slices it contains
+    ///
+    /// [`Self::try_push`] can perform this validation check on insertion
+    pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
+        match std::str::from_utf8(&self.values.as_slice()[start_offset..]) {
+            Ok(_) => Ok(()),
+            Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
+        }
+    }
+
+    /// Converts this into an [`ArrayRef`] with the provided `data_type` and 
`null_buffer`
+    pub fn into_array(
+        self,
+        null_buffer: Option<Buffer>,
+        data_type: ArrowType,
+    ) -> ArrayRef {
+        let mut array_data_builder = ArrayDataBuilder::new(data_type)
+            .len(self.len())
+            .add_buffer(self.offsets.into())
+            .add_buffer(self.values.into());
+
+        if let Some(buffer) = null_buffer {
+            array_data_builder = array_data_builder.null_bit_buffer(buffer);
+        }
+
+        let data = match cfg!(debug_assertions) {
+            true => array_data_builder.build().unwrap(),
+            false => unsafe { array_data_builder.build_unchecked() },
+        };
+
+        make_array(data)
+    }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
+    type Output = Self;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len);
+        let remaining_offsets = self.offsets.len() - len - 1;
+        let offsets = self.offsets.as_slice();
+
+        let end_offset = offsets[len];
+
+        let mut new_offsets = ScalarBuffer::new();
+        new_offsets.reserve(remaining_offsets + 1);
+        for v in &offsets[len..] {
+            new_offsets.push(*v - end_offset)
+        }
+
+        self.offsets.resize(len + 1);
+
+        Self {
+            offsets: std::mem::replace(&mut self.offsets, new_offsets),
+            values: self.values.take(end_offset.to_usize().unwrap()),
+        }
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        assert_eq!(self.offsets.len(), len + 1);
+    }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for OffsetBuffer<I> {
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        rev_position_iter: impl Iterator<Item = usize>,
+    ) {
+        assert_eq!(self.offsets.len(), read_offset + values_read + 1);
+        self.offsets.resize(read_offset + levels_read + 1);
+
+        let offsets = self.offsets.as_slice_mut();
+
+        let mut last_pos = read_offset + levels_read + 1;
+        let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
+
+        let values_range = read_offset..read_offset + values_read;
+        for (value_pos, level_pos) in 
values_range.clone().rev().zip(rev_position_iter) {
+            assert!(level_pos >= value_pos);
+            assert!(level_pos < last_pos);
+
+            let end_offset = offsets[value_pos + 1];
+            let start_offset = offsets[value_pos];
+
+            // Fill in any nulls
+            for x in &mut offsets[level_pos + 1..last_pos] {
+                *x = end_offset;
+            }
+
+            if level_pos == value_pos {
+                return;
+            }
+
+            offsets[level_pos] = start_offset;
+            last_pos = level_pos;
+            last_start_offset = start_offset;
+        }
+
+        // Pad leading nulls up to `last_offset`
+        for x in &mut offsets[values_range.start + 1..last_pos] {
+            *x = last_start_offset
+        }
+    }
+}
+
+impl<I: ScalarValue> ValuesBufferSlice for OffsetBuffer<I> {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, LargeStringArray, StringArray};
+
+    #[test]
+    fn test_offset_buffer_empty() {
+        let buffer = OffsetBuffer::<i32>::default();
+        let array = buffer.into_array(None, ArrowType::Utf8);
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(strings.len(), 0);
+    }
+
+    #[test]
+    fn test_offset_buffer_append() {
+        let mut buffer = OffsetBuffer::<i64>::default();
+        buffer.try_push("hello".as_bytes(), true).unwrap();
+        buffer.try_push("bar".as_bytes(), true).unwrap();
+        buffer
+            .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], 
"abcdef".as_bytes())
+            .unwrap();
+
+        let array = buffer.into_array(None, ArrowType::LargeUtf8);
+        let strings = 
array.as_any().downcast_ref::<LargeStringArray>().unwrap();
+        assert_eq!(
+            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+            vec!["hello", "bar", "cd", "f", "ab", "e"]
+        )
+    }
+
+    #[test]
+    fn test_offset_buffer_split() {
+        let mut buffer = OffsetBuffer::<i32>::default();
+        for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
+            buffer.try_push(v.as_bytes(), false).unwrap()
+        }
+        let split = buffer.split_off(3);
+
+        let array = split.into_array(None, ArrowType::Utf8);
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+            vec!["hello", "world", "cupcakes"]
+        );
+
+        buffer.try_push("test".as_bytes(), false).unwrap();
+        let array = buffer.into_array(None, ArrowType::Utf8);
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+            vec!["a", "b", "c", "test"]
+        );
+    }
+
+    #[test]
+    fn test_offset_buffer_pad_nulls() {
+        let mut buffer = OffsetBuffer::<i32>::default();
+        for v in ["a", "b", "c", "def", "gh"] {
+            buffer.try_push(v.as_bytes(), false).unwrap()
+        }
+
+        // Both trailing and leading nulls
+        buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter());
+
+        // No null buffer - nulls -> ""
+        let array = buffer.into_array(None, ArrowType::Utf8);
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(
+            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+            vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""]
+        );
+    }
+
+    #[test]
+    fn test_utf8_validation() {
+        let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
+        std::str::from_utf8(valid_2_byte_utf8).unwrap();
+        let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
+        std::str::from_utf8(valid_3_byte_utf8).unwrap();
+        let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 
0b10100101];
+        std::str::from_utf8(valid_4_byte_utf8).unwrap();
+
+        let mut buffer = OffsetBuffer::<i32>::default();
+        buffer.try_push(valid_2_byte_utf8, true).unwrap();
+        buffer.try_push(valid_3_byte_utf8, true).unwrap();
+        buffer.try_push(valid_4_byte_utf8, true).unwrap();
+
+        // Cannot append string starting with incomplete codepoint
+        buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
+        buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
+        buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
+        buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
+        buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
+        buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
+
+        // Can append data containing an incomplete codepoint
+        buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
+
+        assert_eq!(buffer.len(), 4);
+        assert_eq!(buffer.values.len(), 11);
+
+        buffer.try_push(valid_3_byte_utf8, true).unwrap();
+
+        // Should fail due to incomplete codepoint
+        buffer.check_valid_utf8(0).unwrap_err();
+
+        // After broken codepoint -> success
+        buffer.check_valid_utf8(11).unwrap();
+
+        // Fails if run from middle of codepoint
+        buffer.check_valid_utf8(12).unwrap_err();
+    }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index 94b2e7b..476cf08 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -251,6 +251,7 @@ mod tests {
     use crate::file::writer::{FileWriter, SerializedFileWriter};
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::{Type, TypePtr};
+    use crate::util::cursor::SliceableCursor;
     use crate::util::test_common::RandGen;
     use arrow::array::*;
     use arrow::datatypes::{DataType as ArrowDataType, Field};
@@ -410,7 +411,7 @@ mod tests {
         let encodings = &[
             Encoding::PLAIN,
             Encoding::RLE_DICTIONARY,
-            //Encoding::DELTA_LENGTH_BYTE_ARRAY,
+            Encoding::DELTA_LENGTH_BYTE_ARRAY,
             Encoding::DELTA_BYTE_ARRAY,
         ];
 
@@ -968,4 +969,40 @@ mod tests {
         assert_eq!(batch.num_rows(), 4);
         assert_eq!(batch.column(0).data().null_count(), 2);
     }
+
+    #[test]
+    fn test_invalid_utf8() {
+        // a parquet file with 1 column with invalid utf8
+        let data = vec![
+            80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 
21, 0, 21, 4,
+            21, 0, 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 
101, 255,
+            108, 111, 0, 0, 0, 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 
110, 28,
+            21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 
22, 102, 38,
+            8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 
255, 108, 111,
+            0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116, 21, 2, 0, 21, 
12, 37, 2,
+            24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28, 38, 
110, 28,
+            21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 
22, 102, 38,
+            8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 
255, 108, 111,
+            0, 0, 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 
32, 45, 32,
+            78, 97, 116, 105, 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 
112, 108,
+            101, 109, 101, 110, 116, 97, 116, 105, 111, 110, 32, 111, 102, 32, 
65, 114,
+            114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49,
+        ];
+
+        let file = SliceableCursor::new(data);
+        let file_reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = 
ParquetFileArrowReader::new(Arc::new(file_reader));
+
+        let mut record_batch_reader = arrow_reader
+            .get_record_reader_by_columns(vec![0], 10)
+            .unwrap();
+
+        let error = record_batch_reader.next().unwrap().unwrap_err();
+
+        assert!(
+            error.to_string().contains("invalid utf-8 sequence"),
+            "{}",
+            error
+        );
+    }
 }
diff --git a/parquet/src/arrow/record_reader.rs 
b/parquet/src/arrow/record_reader.rs
index df93ebf..ce77db9 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -264,12 +264,12 @@ where
                 )
             })?;
 
-            let iter = def_levels.rev_valid_positions_iter(
-                self.values_written..self.values_written + levels_read,
+            self.records.pad_nulls(
+                self.values_written,
+                values_read,
+                levels_read,
+                def_levels.rev_valid_positions_iter(),
             );
-
-            self.records
-                .pad_nulls(self.values_written..self.values_written + 
values_read, iter);
         }
 
         let values_read = max(levels_read, values_read);
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 29e6110..5c69dfa 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -16,9 +16,9 @@
 // under the License.
 
 use std::marker::PhantomData;
-use std::ops::Range;
 
 use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::datatypes::ToByteSlice;
 
 /// A buffer that supports writing new data to the end, and removing data from 
the front
 ///
@@ -74,6 +74,7 @@ pub trait BufferQueue: Sized {
 ///
 pub trait ScalarValue {}
 impl ScalarValue for bool {}
+impl ScalarValue for u8 {}
 impl ScalarValue for i16 {}
 impl ScalarValue for i32 {}
 impl ScalarValue for i64 {}
@@ -114,6 +115,10 @@ impl<T: ScalarValue> ScalarBuffer<T> {
         self.len == 0
     }
 
+    pub fn reserve(&mut self, additional: usize) {
+        self.buffer.reserve(additional * std::mem::size_of::<T>());
+    }
+
     pub fn resize(&mut self, len: usize) {
         self.buffer.resize(len * std::mem::size_of::<T>(), 0);
         self.len = len;
@@ -133,14 +138,8 @@ impl<T: ScalarValue> ScalarBuffer<T> {
         assert!(prefix.is_empty() && suffix.is_empty());
         buf
     }
-}
 
-impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
-    type Output = Buffer;
-
-    type Slice = [T];
-
-    fn split_off(&mut self, len: usize) -> Self::Output {
+    pub fn take(&mut self, len: usize) -> Self {
         assert!(len <= self.len);
 
         let num_bytes = len * std::mem::size_of::<T>();
@@ -158,7 +157,39 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
         self.buffer.resize(num_bytes, 0);
         self.len -= len;
 
-        std::mem::replace(&mut self.buffer, remaining).into()
+        Self {
+            buffer: std::mem::replace(&mut self.buffer, remaining),
+            len,
+            _phantom: Default::default(),
+        }
+    }
+}
+
+impl<T: ScalarValue + ToByteSlice> ScalarBuffer<T> {
+    pub fn push(&mut self, v: T) {
+        self.buffer.push(v);
+        self.len += 1;
+    }
+
+    pub fn extend_from_slice(&mut self, v: &[T]) {
+        self.buffer.extend_from_slice(v);
+        self.len += v.len();
+    }
+}
+
+impl<T: ScalarValue> From<ScalarBuffer<T>> for Buffer {
+    fn from(t: ScalarBuffer<T>) -> Self {
+        t.buffer.into()
+    }
+}
+
+impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        self.take(len).into()
     }
 
     fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
@@ -180,20 +211,34 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
 
 /// A [`BufferQueue`] capable of storing column values
 pub trait ValuesBuffer: BufferQueue {
-    /// Iterate through the indexes in `range` in reverse order, moving the 
value at each
-    /// index to the next index returned by `rev_valid_position_iter`
+    ///
+    /// If a column contains nulls, more level data may be read than value 
data, as null
+    /// values are not encoded. Therefore, first the levels data is read, the 
null count
+    /// determined, and then the corresponding number of values read to a 
[`ValuesBuffer`].
+    ///
+    /// It is then necessary to move this values data into positions that 
correspond to
+    /// the non-null level positions. This is what this method does.
+    ///
+    /// It is provided with:
+    ///
+    /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding 
from
+    /// - `values_read` - the number of values read
+    /// - `levels_read` - the number of levels read
+    /// - `rev_valid_position_iter` - a reverse iterator of the valid level 
positions
     ///
     /// It is required that:
     ///
-    /// - `rev_valid_position_iter` has at least `range.end - range.start` 
elements
+    /// - `rev_valid_position_iter` has at least `values_len` elements
     /// - `rev_valid_position_iter` returns strictly monotonically decreasing 
values
-    /// - the `i`th index returned by `rev_valid_position_iter` is `>= 
range.end - i - 1`
+    /// - `rev_valid_position_iter` returns values in the range 
`read_offset..read_offset+levels_len`
     ///
     /// Implementations may panic or otherwise misbehave if this is not the 
case
     ///
     fn pad_nulls(
         &mut self,
-        range: Range<usize>,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
         rev_valid_position_iter: impl Iterator<Item = usize>,
     );
 }
@@ -201,12 +246,16 @@ pub trait ValuesBuffer: BufferQueue {
 impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
     fn pad_nulls(
         &mut self,
-        range: Range<usize>,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
         rev_valid_position_iter: impl Iterator<Item = usize>,
     ) {
         let slice = self.as_slice_mut();
+        assert!(slice.len() >= read_offset + levels_read);
 
-        for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter) 
{
+        let values_range = read_offset..read_offset + values_read;
+        for (value_pos, level_pos) in 
values_range.rev().zip(rev_valid_position_iter) {
             debug_assert!(level_pos >= value_pos);
             if level_pos <= value_pos {
                 break;
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 750e118..d53310e 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -126,12 +126,8 @@ impl DefinitionLevelBuffer {
         Bitmap::from(std::mem::replace(old_builder, new_builder).finish())
     }
 
-    /// Returns an iterator of the valid positions in `range` in descending 
order
-    pub fn rev_valid_positions_iter(
-        &self,
-        range: Range<usize>,
-    ) -> impl Iterator<Item = usize> + '_ {
-        assert_eq!(range.start, self.len);
+    /// Returns an iterator of the valid positions in descending order
+    pub fn rev_valid_positions_iter(&self) -> impl Iterator<Item = usize> + '_ 
{
         iter_set_bits_rev(self.nulls().as_slice())
     }
 
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index fe0344f..1fc722f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -258,11 +258,15 @@ where
             // At this point we have read values, definition and repetition 
levels.
             // If both definition and repetition levels are defined, their 
counts
             // should be equal. Values count is always less or equal to 
definition levels.
-            if num_def_levels != 0 && num_rep_levels != 0 {
-                assert_eq!(
-                    num_def_levels, num_rep_levels,
-                    "Number of decoded rep / def levels did not match"
-                );
+            if num_def_levels != 0
+                && num_rep_levels != 0
+                && num_rep_levels != num_def_levels
+            {
+                return Err(general_err!(
+                    "inconsistent number of levels read - def: {}, rep: {}",
+                    num_def_levels,
+                    num_rep_levels
+                ));
             }
 
             // Note that if field is not required, but no definition levels 
are provided,
@@ -275,6 +279,14 @@ where
                 .values_decoder
                 .read(values, values_read..values_read + values_to_read)?;
 
+            if num_def_levels != 0 && curr_values_read != num_def_levels - 
null_count {
+                return Err(general_err!(
+                    "insufficient values read from column - expected: {}, got: 
{}",
+                    num_def_levels - null_count,
+                    curr_values_read
+                ));
+            }
+
             // Update all "return" counters and internal state.
 
             // This is to account for when def or rep levels are not provided
@@ -359,6 +371,7 @@ where
                                 encoding,
                                 buf.start_from(offset),
                                 num_values as usize,
+                                None,
                             )?;
                             return Ok(true);
                         }
@@ -367,13 +380,17 @@ where
                             buf,
                             num_values,
                             encoding,
-                            num_nulls: _,
+                            num_nulls,
                             num_rows: _,
                             def_levels_byte_len,
                             rep_levels_byte_len,
                             is_compressed: _,
                             statistics: _,
                         } => {
+                            if num_nulls > num_values {
+                                return Err(general_err!("more nulls than 
values in page, contained {} values and {} nulls", num_values, num_nulls));
+                            }
+
                             self.num_buffered_values = num_values;
                             self.num_decoded_values = 0;
 
@@ -408,6 +425,7 @@ where
                                     (rep_levels_byte_len + 
def_levels_byte_len) as usize,
                                 ),
                                 num_values as usize,
+                                Some((num_values - num_nulls) as usize),
                             )?;
                             return Ok(true);
                         }
diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index a9221d9..76b0d07 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -95,11 +95,22 @@ pub trait ColumnValueDecoder {
     ) -> Result<()>;
 
     /// Set the current data page
+    ///
+    /// - `encoding` - the encoding of the page
+    /// - `data` - a point to the page's uncompressed value data
+    /// - `num_levels` - the number of levels contained within the page, i.e. 
values including nulls
+    /// - `num_values` - the number of non-null values contained within the 
page (V2 page only)
+    ///
+    /// Note: data encoded with [`Encoding::RLE`] may not know its exact 
length, as the final
+    /// run may be zero-padded. As such if `num_values` is not provided (i.e. 
`None`),
+    /// subsequent calls to `ColumnValueDecoder::read` may yield more values 
than
+    /// non-null definition levels within the page
     fn set_data(
         &mut self,
         encoding: Encoding,
         data: ByteBufferPtr,
-        num_values: usize,
+        num_levels: usize,
+        num_values: Option<usize>,
     ) -> Result<()>;
 
     /// Read values data into `out[range]` returning the number of values read
@@ -170,7 +181,8 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
         &mut self,
         mut encoding: Encoding,
         data: ByteBufferPtr,
-        num_values: usize,
+        num_levels: usize,
+        num_values: Option<usize>,
     ) -> Result<()> {
         use std::collections::hash_map::Entry;
 
@@ -193,7 +205,7 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
             }
         };
 
-        decoder.set_data(data, num_values)?;
+        decoder.set_data(data, num_values.unwrap_or(num_levels))?;
         self.current_encoding = Some(encoding);
         Ok(())
     }

Reply via email to