This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc0c05 parquet: Optimized ByteArrayReader, Add UTF-8 Validation
(#1040) (#1082)
0cc0c05 is described below
commit 0cc0c055e0fb09553fa80186768f9b539a552419
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jan 18 12:13:21 2022 +0000
parquet: Optimized ByteArrayReader, Add UTF-8 Validation (#1040) (#1082)
* Optimized ByteArrayReader (#1040)
UTF-8 Validation (#786)
* Fix arrow_array_reader benchmark
* Allow running subset of arrow_array_reader benchmarks
* Faster UTF-8 validation
* Tweak null handling
* Add license
* Refine `ValuesBuffer::pad_nulls`
* Tweak error handling
* Use page null count if available
* Doc comments
* Test DELTA_BYTE_ARRAY encoding
* Support legacy Encoding::PLAIN_DICTIONARY
* Add OffsetBuffer unit tests
Review feedback
* More tests
* Fix lint
* Review feedback
---
parquet/benches/arrow_array_reader.rs | 146 +++--
parquet/src/arrow/array_reader.rs | 92 ++-
parquet/src/arrow/array_reader/byte_array.rs | 699 +++++++++++++++++++++
parquet/src/arrow/array_reader/offset_buffer.rs | 338 ++++++++++
parquet/src/arrow/arrow_reader.rs | 39 +-
parquet/src/arrow/record_reader.rs | 10 +-
parquet/src/arrow/record_reader/buffer.rs | 81 ++-
.../src/arrow/record_reader/definition_levels.rs | 8 +-
parquet/src/column/reader.rs | 30 +-
parquet/src/column/reader/decoder.rs | 18 +-
10 files changed, 1299 insertions(+), 162 deletions(-)
diff --git a/parquet/benches/arrow_array_reader.rs
b/parquet/benches/arrow_array_reader.rs
index 5587b52..54f99c4 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -47,6 +47,7 @@ const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;
+use arrow::array::Array;
use rand::{rngs::StdRng, Rng, SeedableRng};
pub fn seedable_rng() -> StdRng {
@@ -273,7 +274,7 @@ fn build_dictionary_encoded_string_page_iterator(
InMemoryPageIterator::new(schema, column_desc, pages)
}
-fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+fn bench_array_reader(mut array_reader: Box<dyn ArrayReader>) -> usize {
// test procedure: read data in batches of 8192 until no more data
let mut total_count = 0;
loop {
@@ -290,49 +291,46 @@ fn bench_array_reader(mut array_reader: impl ArrayReader)
-> usize {
fn create_int32_arrow_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
use parquet::arrow::arrow_array_reader::{ArrowArrayReader,
PrimitiveArrayConverter};
let converter =
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap()
+ let reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap();
+ Box::new(reader)
}
fn create_int32_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::PrimitiveArrayReader;
- PrimitiveArrayReader::<Int32Type>::new_with_options(
+ let reader = PrimitiveArrayReader::<Int32Type>::new_with_options(
Box::new(page_iterator),
column_desc,
None,
true,
)
- .unwrap()
+ .unwrap();
+ Box::new(reader)
}
fn create_string_arrow_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
+) -> Box<dyn ArrayReader> {
use parquet::arrow::arrow_array_reader::{ArrowArrayReader,
StringArrayConverter};
let converter = StringArrayConverter::new();
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap()
+ let reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap();
+ Box::new(reader)
}
-fn create_string_complex_array_reader(
+fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
-) -> impl ArrayReader {
- use parquet::arrow::array_reader::ComplexObjectArrayReader;
- use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- ComplexObjectArrayReader::<parquet::data_type::ByteArrayType,
Utf8Converter>::new(
- Box::new(page_iterator),
- column_desc,
- converter,
- None,
- )
- .unwrap()
+) -> Box<dyn ArrayReader> {
+ use parquet::arrow::array_reader::make_byte_array_reader;
+ make_byte_array_reader(Box::new(page_iterator), column_desc, None,
true).unwrap()
}
fn add_benches(c: &mut Criterion) {
@@ -368,10 +366,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, plain encoded, mandatory, no NULLs - new",
@@ -382,10 +380,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
schema.clone(),
@@ -401,10 +399,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, plain encoded, optional, no NULLs - new",
@@ -415,10 +413,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// int32, plain encoded, half NULLs
let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
@@ -435,10 +433,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, plain encoded, optional, half NULLs - new",
@@ -449,10 +447,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// int32, dictionary encoded, no NULLs
let dictionary_int32_no_null_data =
build_dictionary_encoded_int32_page_iterator(
@@ -469,10 +467,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, dictionary encoded, mandatory, no NULLs - new",
@@ -483,10 +481,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
let dictionary_int32_no_null_data =
build_dictionary_encoded_int32_page_iterator(
schema.clone(),
@@ -502,10 +500,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, dictionary encoded, optional, no NULLs - new",
@@ -516,10 +514,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// int32, dictionary encoded, half NULLs
let dictionary_int32_half_null_data =
build_dictionary_encoded_int32_page_iterator(
@@ -536,10 +534,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read Int32Array, dictionary encoded, optional, half NULLs - new",
@@ -550,10 +548,10 @@ fn add_benches(c: &mut Criterion) {
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// string benchmarks
//==============================
@@ -568,15 +566,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, plain encoded, mandatory, no NULLs - new",
@@ -587,10 +585,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
schema.clone(),
@@ -601,15 +599,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, plain encoded, optional, no NULLs - new",
@@ -620,10 +618,10 @@ fn add_benches(c: &mut Criterion) {
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// string, plain encoded, half NULLs
let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
@@ -635,15 +633,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, plain encoded, optional, half NULLs - new",
@@ -654,10 +652,10 @@ fn add_benches(c: &mut Criterion) {
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(
@@ -669,15 +667,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, dictionary encoded, mandatory, no NULLs - new",
@@ -688,10 +686,10 @@ fn add_benches(c: &mut Criterion) {
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(
schema.clone(),
@@ -702,15 +700,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, dictionary encoded, optional, no NULLs - new",
@@ -721,10 +719,10 @@ fn add_benches(c: &mut Criterion) {
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(
@@ -736,15 +734,15 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
- let array_reader = create_string_complex_array_reader(
+ let array_reader = create_string_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.bench_function(
"read StringArray, dictionary encoded, optional, half NULLs - new",
@@ -755,10 +753,10 @@ fn add_benches(c: &mut Criterion) {
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
- })
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);
- assert_eq!(count, EXPECTED_VALUE_COUNT);
group.finish();
}
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index 537b4cc..c11bfc2 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -60,8 +60,7 @@ use crate::arrow::converter::{
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
- IntervalYearMonthConverter, LargeBinaryArrayConverter,
LargeBinaryConverter,
- LargeUtf8ArrayConverter, LargeUtf8Converter,
+ IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -81,6 +80,11 @@ use crate::schema::types::{
};
use crate::schema::visitor::TypeVisitor;
+mod byte_array;
+mod offset_buffer;
+
+pub use byte_array::make_byte_array_reader;
+
/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
fn as_any(&self) -> &dyn Any;
@@ -1778,57 +1782,43 @@ impl<'a> ArrayReaderBuilder {
null_mask_only,
)?,
)),
- PhysicalType::BYTE_ARRAY => {
- if cur_type.get_basic_info().converted_type() ==
ConvertedType::UTF8 {
- if let Some(ArrowType::LargeUtf8) = arrow_type {
- let converter =
- LargeUtf8Converter::new(LargeUtf8ArrayConverter
{});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- LargeUtf8Converter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- } else {
- use crate::arrow::arrow_array_reader::{
- ArrowArrayReader, StringArrayConverter,
- };
- let converter = StringArrayConverter::new();
- Ok(Box::new(ArrowArrayReader::try_new(
- *page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
+ PhysicalType::BYTE_ARRAY => match arrow_type {
+ // TODO: Replace with optimised dictionary reader (#171)
+ Some(ArrowType::Dictionary(_, _)) => {
+ match cur_type.get_basic_info().converted_type() {
+ ConvertedType::UTF8 => {
+ let converter =
Utf8Converter::new(Utf8ArrayConverter {});
+ Ok(Box::new(ComplexObjectArrayReader::<
+ ByteArrayType,
+ Utf8Converter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ _ => {
+ let converter =
BinaryConverter::new(BinaryArrayConverter {});
+ Ok(Box::new(ComplexObjectArrayReader::<
+ ByteArrayType,
+ BinaryConverter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
}
- } else if let Some(ArrowType::LargeBinary) = arrow_type {
- let converter =
- LargeBinaryConverter::new(LargeBinaryArrayConverter
{});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- LargeBinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- } else {
- let converter = BinaryConverter::new(BinaryArrayConverter
{});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- BinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
}
- }
+ _ => make_byte_array_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
+ },
PhysicalType::FIXED_LEN_BYTE_ARRAY
if cur_type.get_basic_info().converted_type()
== ConvertedType::DECIMAL =>
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
new file mode 100644
index 0000000..fc214dd
--- /dev/null
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -0,0 +1,699 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::ScalarValue;
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::data_type::Int32Type;
+use crate::encodings::{
+ decoding::{Decoder, DeltaBitPackDecoder},
+ rle::RleDecoder,
+};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
+use arrow::array::{ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::ops::Range;
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+pub fn make_byte_array_reader(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ arrow_type: Option<ArrowType>,
+ null_mask_only: bool,
+) -> Result<Box<dyn ArrayReader>> {
+ // Check if Arrow type is specified, else create it from Parquet type
+ let data_type = match arrow_type {
+ Some(t) => t,
+ None => parquet_to_arrow_field(column_desc.as_ref())?
+ .data_type()
+ .clone(),
+ };
+
+ match data_type {
+ ArrowType::Binary | ArrowType::Utf8 => {
+ let reader =
+ GenericRecordReader::new_with_options(column_desc,
null_mask_only);
+ Ok(Box::new(ByteArrayReader::<i32>::new(
+ pages, data_type, reader,
+ )))
+ }
+ ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
+ let reader =
+ GenericRecordReader::new_with_options(column_desc,
null_mask_only);
+ Ok(Box::new(ByteArrayReader::<i64>::new(
+ pages, data_type, reader,
+ )))
+ }
+ _ => Err(general_err!(
+ "invalid data type for byte array reader - {}",
+ data_type
+ )),
+ }
+}
+
+/// An [`ArrayReader`] for variable length byte arrays
+struct ByteArrayReader<I: ScalarValue> {
+ data_type: ArrowType,
+ pages: Box<dyn PageIterator>,
+ def_levels_buffer: Option<Buffer>,
+ rep_levels_buffer: Option<Buffer>,
+ record_reader: GenericRecordReader<OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>>,
+}
+
+impl<I: ScalarValue> ByteArrayReader<I> {
+ fn new(
+ pages: Box<dyn PageIterator>,
+ data_type: ArrowType,
+ record_reader: GenericRecordReader<
+ OffsetBuffer<I>,
+ ByteArrayColumnValueDecoder<I>,
+ >,
+ ) -> Self {
+ Self {
+ data_type,
+ pages,
+ def_levels_buffer: None,
+ rep_levels_buffer: None,
+ record_reader,
+ }
+ }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ read_records(&mut self.record_reader, self.pages.as_mut(),
batch_size)?;
+ let buffer = self.record_reader.consume_record_data()?;
+ let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+ self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ self.record_reader.reset();
+
+ Ok(buffer.into_array(null_buffer, self.data_type.clone()))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_levels_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_levels_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+}
+
+/// A [`ColumnValueDecoder`] for variable length byte arrays
+struct ByteArrayColumnValueDecoder<I: ScalarValue> {
+ dict: Option<OffsetBuffer<I>>,
+ decoder: Option<ByteArrayDecoder>,
+ validate_utf8: bool,
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
+ for ByteArrayColumnValueDecoder<I>
+{
+ type Slice = OffsetBuffer<I>;
+
+ fn new(desc: &ColumnDescPtr) -> Self {
+ let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
+ Self {
+ dict: None,
+ decoder: None,
+ validate_utf8,
+ }
+ }
+
+ fn set_dict(
+ &mut self,
+ buf: ByteBufferPtr,
+ num_values: u32,
+ encoding: Encoding,
+ _is_sorted: bool,
+ ) -> Result<()> {
+ if !matches!(
+ encoding,
+ Encoding::PLAIN | Encoding::RLE_DICTIONARY |
Encoding::PLAIN_DICTIONARY
+ ) {
+ return Err(nyi_err!(
+ "Invalid/Unsupported encoding type for dictionary: {}",
+ encoding
+ ));
+ }
+
+ let mut buffer = OffsetBuffer::default();
+ let mut decoder = ByteArrayDecoderPlain::new(
+ buf,
+ num_values as usize,
+ Some(num_values as usize),
+ self.validate_utf8,
+ );
+ decoder.read(&mut buffer, usize::MAX)?;
+ self.dict = Some(buffer);
+ Ok(())
+ }
+
+ fn set_data(
+ &mut self,
+ encoding: Encoding,
+ data: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ ) -> Result<()> {
+ self.decoder = Some(ByteArrayDecoder::new(
+ encoding,
+ data,
+ num_levels,
+ num_values,
+ self.validate_utf8,
+ )?);
+ Ok(())
+ }
+
+ fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ self.decoder.as_mut().expect("decoder set").read(
+ out,
+ range.end - range.start,
+ self.dict.as_ref(),
+ )
+ }
+}
+
+/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
+pub enum ByteArrayDecoder {
+ Plain(ByteArrayDecoderPlain),
+ Dictionary(ByteArrayDecoderDictionary),
+ DeltaLength(ByteArrayDecoderDeltaLength),
+ DeltaByteArray(ByteArrayDecoderDelta),
+}
+
+impl ByteArrayDecoder {
+ pub fn new(
+ encoding: Encoding,
+ data: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ validate_utf8: bool,
+ ) -> Result<Self> {
+ let decoder = match encoding {
+ Encoding::PLAIN =>
ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new(
+ data,
+ num_levels,
+ num_values,
+ validate_utf8,
+ )),
+ Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
+ ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
+ data, num_levels, num_values,
+ ))
+ }
+ Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
+ ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
+ ),
+ Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
+ ByteArrayDecoderDelta::new(data, validate_utf8)?,
+ ),
+ _ => {
+ return Err(general_err!(
+ "unsupported encoding for byte array: {}",
+ encoding
+ ))
+ }
+ };
+
+ Ok(decoder)
+ }
+
+ /// Read up to `len` values to `out` with the optional dictionary
+ pub fn read<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ out: &mut OffsetBuffer<I>,
+ len: usize,
+ dict: Option<&OffsetBuffer<I>>,
+ ) -> Result<usize> {
+ match self {
+ ByteArrayDecoder::Plain(d) => d.read(out, len),
+ ByteArrayDecoder::Dictionary(d) => {
+ let dict = dict.expect("dictionary set");
+ d.read(out, dict, len)
+ }
+ ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
+ ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
+ }
+ }
+}
+
+/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
+pub struct ByteArrayDecoderPlain {
+ buf: ByteBufferPtr,
+ offset: usize,
+ validate_utf8: bool,
+
+ /// This is a maximum as the null count is not always known, e.g. value
data from
+ /// a v1 data page
+ max_remaining_values: usize,
+}
+
+impl ByteArrayDecoderPlain {
+ pub fn new(
+ buf: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ validate_utf8: bool,
+ ) -> Self {
+ Self {
+ buf,
+ validate_utf8,
+ offset: 0,
+ max_remaining_values: num_values.unwrap_or(num_levels),
+ }
+ }
+
+ pub fn read<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ output: &mut OffsetBuffer<I>,
+ len: usize,
+ ) -> Result<usize> {
+ let initial_values_length = output.values.len();
+
+ let to_read = len.min(self.max_remaining_values);
+ output.offsets.reserve(to_read);
+
+ let remaining_bytes = self.buf.len() - self.offset;
+ if remaining_bytes == 0 {
+ return Ok(0);
+ }
+
+ let estimated_bytes = remaining_bytes
+ .checked_mul(to_read)
+ .map(|x| x / self.max_remaining_values)
+ .unwrap_or_default();
+
+ output.values.reserve(estimated_bytes);
+
+ let mut read = 0;
+
+ let buf = self.buf.as_ref();
+ while self.offset < self.buf.len() && read != to_read {
+ if self.offset + 4 > buf.len() {
+ return Err(ParquetError::EOF("eof decoding byte
array".into()));
+ }
+ let len_bytes: [u8; 4] =
+ buf[self.offset..self.offset + 4].try_into().unwrap();
+ let len = u32::from_le_bytes(len_bytes);
+
+ let start_offset = self.offset + 4;
+ let end_offset = start_offset + len as usize;
+ if end_offset > buf.len() {
+ return Err(ParquetError::EOF("eof decoding byte
array".into()));
+ }
+
+ output.try_push(&buf[start_offset..end_offset],
self.validate_utf8)?;
+
+ self.offset = end_offset;
+ read += 1;
+ }
+ self.max_remaining_values -= to_read;
+
+ if self.validate_utf8 {
+ output.check_valid_utf8(initial_values_length)?;
+ }
+ Ok(to_read)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDeltaLength {
+ lengths: Vec<i32>,
+ data: ByteBufferPtr,
+ length_offset: usize,
+ data_offset: usize,
+ validate_utf8: bool,
+}
+
+impl ByteArrayDecoderDeltaLength {
+ fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
+ let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ len_decoder.set_data(data.all(), 0)?;
+ let values = len_decoder.values_left();
+
+ let mut lengths = vec![0; values];
+ len_decoder.get(&mut lengths)?;
+
+ Ok(Self {
+ lengths,
+ data,
+ validate_utf8,
+ length_offset: 0,
+ data_offset: len_decoder.get_offset(),
+ })
+ }
+
+ fn read<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ output: &mut OffsetBuffer<I>,
+ len: usize,
+ ) -> Result<usize> {
+ let initial_values_length = output.values.len();
+
+ let to_read = len.min(self.lengths.len() - self.length_offset);
+ output.offsets.reserve(to_read);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
+
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+ output.values.reserve(total_bytes);
+
+ if self.data_offset + total_bytes > self.data.len() {
+ return Err(ParquetError::EOF(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
+ let mut start_offset = self.data_offset;
+ for length in src_lengths {
+ let end_offset = start_offset + *length as usize;
+ output.try_push(
+ &self.data.as_ref()[start_offset..end_offset],
+ self.validate_utf8,
+ )?;
+ start_offset = end_offset;
+ }
+
+ self.data_offset = start_offset;
+ self.length_offset += to_read;
+
+ if self.validate_utf8 {
+ output.check_valid_utf8(initial_values_length)?;
+ }
+ Ok(to_read)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDelta {
+ prefix_lengths: Vec<i32>,
+ suffix_lengths: Vec<i32>,
+ data: ByteBufferPtr,
+ length_offset: usize,
+ data_offset: usize,
+ last_value: Vec<u8>,
+ validate_utf8: bool,
+}
+
+impl ByteArrayDecoderDelta {
+ fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
+ let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
+ prefix.set_data(data.all(), 0)?;
+
+ let num_prefix = prefix.values_left();
+ let mut prefix_lengths = vec![0; num_prefix];
+ assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
+
+ let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
+ suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
+
+ let num_suffix = suffix.values_left();
+ let mut suffix_lengths = vec![0; num_suffix];
+ assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
+
+ if num_prefix != num_suffix {
+ return Err(general_err!(format!(
+ "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {},
suffixes: {}",
+ num_prefix, num_suffix
+ )));
+ }
+
+ Ok(Self {
+ prefix_lengths,
+ suffix_lengths,
+ data,
+ length_offset: 0,
+ data_offset: prefix.get_offset() + suffix.get_offset(),
+ last_value: vec![],
+ validate_utf8,
+ })
+ }
+
+ fn read<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ output: &mut OffsetBuffer<I>,
+ len: usize,
+ ) -> Result<usize> {
+ let initial_values_length = output.values.len();
+ assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());
+
+ let to_read = len.min(self.prefix_lengths.len() - self.length_offset);
+
+ output.offsets.reserve(to_read);
+
+ let length_range = self.length_offset..self.length_offset + to_read;
+ let iter = self.prefix_lengths[length_range.clone()]
+ .iter()
+ .zip(&self.suffix_lengths[length_range]);
+
+ let data = self.data.as_ref();
+
+ for (prefix_length, suffix_length) in iter {
+ let prefix_length = *prefix_length as usize;
+ let suffix_length = *suffix_length as usize;
+
+ if self.data_offset + suffix_length > self.data.len() {
+ return Err(ParquetError::EOF("eof decoding byte
array".into()));
+ }
+
+ self.last_value.truncate(prefix_length);
+ self.last_value.extend_from_slice(
+ &data[self.data_offset..self.data_offset + suffix_length],
+ );
+ output.try_push(&self.last_value, self.validate_utf8)?;
+
+ self.data_offset += suffix_length;
+ }
+
+ self.length_offset += to_read;
+
+ if self.validate_utf8 {
+ output.check_valid_utf8(initial_values_length)?;
+ }
+ Ok(to_read)
+ }
+}
+
+/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
+pub struct ByteArrayDecoderDictionary {
+ decoder: RleDecoder,
+
+ index_buf: Box<[i32; 1024]>,
+ index_buf_len: usize,
+ index_offset: usize,
+
+ /// This is a maximum as the null count is not always known, e.g. value
data from
+ /// a v1 data page
+ max_remaining_values: usize,
+}
+
+impl ByteArrayDecoderDictionary {
+ fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>)
-> Self {
+ let bit_width = data[0];
+ let mut decoder = RleDecoder::new(bit_width);
+ decoder.set_data(data.start_from(1));
+
+ Self {
+ decoder,
+ index_buf: Box::new([0; 1024]),
+ index_buf_len: 0,
+ index_offset: 0,
+ max_remaining_values: num_values.unwrap_or(num_levels),
+ }
+ }
+
+ fn read<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ output: &mut OffsetBuffer<I>,
+ dict: &OffsetBuffer<I>,
+ len: usize,
+ ) -> Result<usize> {
+ let mut values_read = 0;
+
+ while values_read != len && self.max_remaining_values != 0 {
+ if self.index_offset == self.index_buf_len {
+ let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+ if read == 0 {
+ break;
+ }
+ self.index_buf_len = read;
+ self.index_offset = 0;
+ }
+
+ let to_read = (len - values_read)
+ .min(self.index_buf_len - self.index_offset)
+ .min(self.max_remaining_values);
+
+ output.extend_from_dictionary(
+ &self.index_buf[self.index_offset..self.index_offset +
to_read],
+ dict.offsets.as_slice(),
+ dict.values.as_slice(),
+ )?;
+
+ self.index_offset += to_read;
+ values_read += to_read;
+ self.max_remaining_values -= to_read;
+ }
+ Ok(values_read)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::record_reader::buffer::ValuesBuffer;
+ use crate::basic::Type as PhysicalType;
+ use crate::data_type::{ByteArray, ByteArrayType};
+ use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
+ use crate::util::memory::MemTracker;
+ use arrow::array::{Array, StringArray};
+ use std::sync::Arc;
+
+ fn column() -> ColumnDescPtr {
+ let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
+ .with_converted_type(ConvertedType::UTF8)
+ .build()
+ .unwrap();
+
+ Arc::new(ColumnDescriptor::new(
+ Arc::new(t),
+ 1,
+ 0,
+ ColumnPath::new(vec![]),
+ ))
+ }
+
+ fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
+ let descriptor = column();
+ let mem_tracker = Arc::new(MemTracker::new());
+ let mut encoder =
+ get_encoder::<ByteArrayType>(descriptor, encoding,
mem_tracker).unwrap();
+
+ encoder.put(data).unwrap();
+ encoder.flush_buffer().unwrap()
+ }
+
+ #[test]
+ fn test_byte_array_decoder() {
+ let data: Vec<_> = vec!["hello", "world", "a", "b"]
+ .into_iter()
+ .map(ByteArray::from)
+ .collect();
+
+ let mut dict_encoder =
+ DictEncoder::<ByteArrayType>::new(column(),
Arc::new(MemTracker::new()));
+
+ dict_encoder.put(&data).unwrap();
+ let encoded_rle = dict_encoder.flush_buffer().unwrap();
+ let encoded_dictionary = dict_encoder.write_dict().unwrap();
+
+ // A column chunk with all the encodings!
+ let pages = vec![
+ (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
+ (
+ Encoding::DELTA_BYTE_ARRAY,
+ get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
+ ),
+ (
+ Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
+ ),
+ (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
+ (Encoding::RLE_DICTIONARY, encoded_rle),
+ ];
+
+ let column_desc = column();
+ let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+ decoder
+ .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+
+ for (encoding, page) in pages {
+ let mut output = OffsetBuffer::<i32>::default();
+ decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+
+ assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+
+ assert_eq!(output.values.as_slice(), "hello".as_bytes());
+ assert_eq!(output.offsets.as_slice(), &[0, 5]);
+
+ assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+ assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
+ assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
+
+ assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+ assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
+ assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
+
+ assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+
+ let valid = vec![false, false, true, true, false, true, true,
false, false];
+ let rev_position_iter = valid
+ .iter()
+ .enumerate()
+ .rev()
+ .filter_map(|(i, valid)| valid.then(|| i));
+
+ let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+
+ output.pad_nulls(0, 4, valid.len(), rev_position_iter);
+ let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
+ let strings =
array.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![
+ None,
+ None,
+ Some("hello"),
+ Some("world"),
+ None,
+ Some("a"),
+ Some("b"),
+ None,
+ None,
+ ]
+ );
+ }
+ }
+}
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs
b/parquet/src/arrow/array_reader/offset_buffer.rs
new file mode 100644
index 0000000..ddf2ad3
--- /dev/null
+++ b/parquet/src/arrow/array_reader/offset_buffer.rs
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::record_reader::buffer::{
+ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+/// A buffer of variable-sized byte arrays that can be converted into
+/// a corresponding [`ArrayRef`]
+pub struct OffsetBuffer<I: ScalarValue> {
+ pub offsets: ScalarBuffer<I>,
+ pub values: ScalarBuffer<u8>,
+}
+
+impl<I: ScalarValue> Default for OffsetBuffer<I> {
+ fn default() -> Self {
+ let mut offsets = ScalarBuffer::new();
+ offsets.resize(1);
+ Self {
+ offsets,
+ values: ScalarBuffer::new(),
+ }
+ }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
+ /// Returns the number of byte arrays in this buffer
+ pub fn len(&self) -> usize {
+ self.offsets.len() - 1
+ }
+
+ /// If `validate_utf8` this verifies that the first character of `data` is
+ /// the start of a UTF-8 codepoint
+ ///
+ /// Note: This does not verify that the entirety of `data` is valid
+ /// UTF-8. This should be done by calling [`Self::values_as_str`] after
+ /// all data has been written
+ pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()>
{
+ if validate_utf8 {
+ if let Some(&b) = data.first() {
+ // A valid code-point iff it does not start with 0b10xxxxxx
+ // Bit-magic taken from `std::str::is_char_boundary`
+ if (b as i8) < -0x40 {
+ return Err(ParquetError::General(
+ "encountered non UTF-8 data".to_string(),
+ ));
+ }
+ }
+ }
+
+ self.values.extend_from_slice(data);
+
+ let index_offset = I::from_usize(self.values.len())
+ .ok_or_else(|| general_err!("index overflow decoding byte
array"))?;
+
+ self.offsets.push(index_offset);
+ Ok(())
+ }
+
+ /// Extends this buffer with a list of keys
+ ///
+ /// For each value `key` in `keys` this will insert
+ /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
+ pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
+ &mut self,
+ keys: &[K],
+ dict_offsets: &[V],
+ dict_values: &[u8],
+ ) -> Result<()> {
+ for key in keys {
+ let index = key.to_usize().unwrap();
+ if index + 1 >= dict_offsets.len() {
+ return Err(general_err!("invalid offset in byte array: {}",
index));
+ }
+ let start_offset = dict_offsets[index].to_usize().unwrap();
+ let end_offset = dict_offsets[index + 1].to_usize().unwrap();
+
+ // Dictionary values are verified when decoding dictionary page
+ self.try_push(&dict_values[start_offset..end_offset], false)?;
+ }
+ Ok(())
+ }
+
+ /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
+ ///
+ /// This MUST be combined with validating that the offsets start on a
character
+ /// boundary, otherwise it would be possible for the values array to be a
valid UTF-8
+ /// sequence, but not the individual string slices it contains
+ ///
+ /// [`Self::try_push`] can perform this validation check on insertion
+ pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
+ match std::str::from_utf8(&self.values.as_slice()[start_offset..]) {
+ Ok(_) => Ok(()),
+ Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
+ }
+ }
+
+ /// Converts this into an [`ArrayRef`] with the provided `data_type` and
`null_buffer`
+ pub fn into_array(
+ self,
+ null_buffer: Option<Buffer>,
+ data_type: ArrowType,
+ ) -> ArrayRef {
+ let mut array_data_builder = ArrayDataBuilder::new(data_type)
+ .len(self.len())
+ .add_buffer(self.offsets.into())
+ .add_buffer(self.values.into());
+
+ if let Some(buffer) = null_buffer {
+ array_data_builder = array_data_builder.null_bit_buffer(buffer);
+ }
+
+ let data = match cfg!(debug_assertions) {
+ true => array_data_builder.build().unwrap(),
+ false => unsafe { array_data_builder.build_unchecked() },
+ };
+
+ make_array(data)
+ }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
+ type Output = Self;
+ type Slice = Self;
+
+ fn split_off(&mut self, len: usize) -> Self::Output {
+ assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len);
+ let remaining_offsets = self.offsets.len() - len - 1;
+ let offsets = self.offsets.as_slice();
+
+ let end_offset = offsets[len];
+
+ let mut new_offsets = ScalarBuffer::new();
+ new_offsets.reserve(remaining_offsets + 1);
+ for v in &offsets[len..] {
+ new_offsets.push(*v - end_offset)
+ }
+
+ self.offsets.resize(len + 1);
+
+ Self {
+ offsets: std::mem::replace(&mut self.offsets, new_offsets),
+ values: self.values.take(end_offset.to_usize().unwrap()),
+ }
+ }
+
+ fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ self
+ }
+
+ fn set_len(&mut self, len: usize) {
+ assert_eq!(self.offsets.len(), len + 1);
+ }
+}
+
+impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for OffsetBuffer<I> {
+ fn pad_nulls(
+ &mut self,
+ read_offset: usize,
+ values_read: usize,
+ levels_read: usize,
+ rev_position_iter: impl Iterator<Item = usize>,
+ ) {
+ assert_eq!(self.offsets.len(), read_offset + values_read + 1);
+ self.offsets.resize(read_offset + levels_read + 1);
+
+ let offsets = self.offsets.as_slice_mut();
+
+ let mut last_pos = read_offset + levels_read + 1;
+ let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
+
+ let values_range = read_offset..read_offset + values_read;
+ for (value_pos, level_pos) in
values_range.clone().rev().zip(rev_position_iter) {
+ assert!(level_pos >= value_pos);
+ assert!(level_pos < last_pos);
+
+ let end_offset = offsets[value_pos + 1];
+ let start_offset = offsets[value_pos];
+
+ // Fill in any nulls
+ for x in &mut offsets[level_pos + 1..last_pos] {
+ *x = end_offset;
+ }
+
+ if level_pos == value_pos {
+ return;
+ }
+
+ offsets[level_pos] = start_offset;
+ last_pos = level_pos;
+ last_start_offset = start_offset;
+ }
+
+ // Pad leading nulls up to `last_offset`
+ for x in &mut offsets[values_range.start + 1..last_pos] {
+ *x = last_start_offset
+ }
+ }
+}
+
+impl<I: ScalarValue> ValuesBufferSlice for OffsetBuffer<I> {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Array, LargeStringArray, StringArray};
+
+ #[test]
+ fn test_offset_buffer_empty() {
+ let buffer = OffsetBuffer::<i32>::default();
+ let array = buffer.into_array(None, ArrowType::Utf8);
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(strings.len(), 0);
+ }
+
+ #[test]
+ fn test_offset_buffer_append() {
+ let mut buffer = OffsetBuffer::<i64>::default();
+ buffer.try_push("hello".as_bytes(), true).unwrap();
+ buffer.try_push("bar".as_bytes(), true).unwrap();
+ buffer
+ .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6],
"abcdef".as_bytes())
+ .unwrap();
+
+ let array = buffer.into_array(None, ArrowType::LargeUtf8);
+ let strings =
array.as_any().downcast_ref::<LargeStringArray>().unwrap();
+ assert_eq!(
+ strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+ vec!["hello", "bar", "cd", "f", "ab", "e"]
+ )
+ }
+
+ #[test]
+ fn test_offset_buffer_split() {
+ let mut buffer = OffsetBuffer::<i32>::default();
+ for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
+ buffer.try_push(v.as_bytes(), false).unwrap()
+ }
+ let split = buffer.split_off(3);
+
+ let array = split.into_array(None, ArrowType::Utf8);
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+ vec!["hello", "world", "cupcakes"]
+ );
+
+ buffer.try_push("test".as_bytes(), false).unwrap();
+ let array = buffer.into_array(None, ArrowType::Utf8);
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+ vec!["a", "b", "c", "test"]
+ );
+ }
+
+ #[test]
+ fn test_offset_buffer_pad_nulls() {
+ let mut buffer = OffsetBuffer::<i32>::default();
+ for v in ["a", "b", "c", "def", "gh"] {
+ buffer.try_push(v.as_bytes(), false).unwrap()
+ }
+
+ // Both trailing and leading nulls
+ buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter());
+
+ // No null buffer - nulls -> ""
+ let array = buffer.into_array(None, ArrowType::Utf8);
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
+ vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""]
+ );
+ }
+
+ #[test]
+ fn test_utf8_validation() {
+ let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
+ std::str::from_utf8(valid_2_byte_utf8).unwrap();
+ let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
+ std::str::from_utf8(valid_3_byte_utf8).unwrap();
+ let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001,
0b10100101];
+ std::str::from_utf8(valid_4_byte_utf8).unwrap();
+
+ let mut buffer = OffsetBuffer::<i32>::default();
+ buffer.try_push(valid_2_byte_utf8, true).unwrap();
+ buffer.try_push(valid_3_byte_utf8, true).unwrap();
+ buffer.try_push(valid_4_byte_utf8, true).unwrap();
+
+ // Cannot append string starting with incomplete codepoint
+ buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
+ buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
+ buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
+ buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
+ buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
+ buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
+
+ // Can append data containing an incomplete codepoint
+ buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
+
+ assert_eq!(buffer.len(), 4);
+ assert_eq!(buffer.values.len(), 11);
+
+ buffer.try_push(valid_3_byte_utf8, true).unwrap();
+
+ // Should fail due to incomplete codepoint
+ buffer.check_valid_utf8(0).unwrap_err();
+
+ // After broken codepoint -> success
+ buffer.check_valid_utf8(11).unwrap();
+
+ // Fails if run from middle of codepoint
+ buffer.check_valid_utf8(12).unwrap_err();
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 94b2e7b..476cf08 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -251,6 +251,7 @@ mod tests {
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
+ use crate::util::cursor::SliceableCursor;
use crate::util::test_common::RandGen;
use arrow::array::*;
use arrow::datatypes::{DataType as ArrowDataType, Field};
@@ -410,7 +411,7 @@ mod tests {
let encodings = &[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
- //Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::DELTA_BYTE_ARRAY,
];
@@ -968,4 +969,40 @@ mod tests {
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.column(0).data().null_count(), 2);
}
+
+ #[test]
+ fn test_invalid_utf8() {
+ // a parquet file with 1 column with invalid utf8
+ let data = vec![
+ 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2,
21, 0, 21, 4,
+ 21, 0, 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104,
101, 255,
+ 108, 111, 0, 0, 0, 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38,
110, 28,
+ 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102,
22, 102, 38,
+ 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101,
255, 108, 111,
+ 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116, 21, 2, 0, 21,
12, 37, 2,
+ 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28, 38,
110, 28,
+ 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102,
22, 102, 38,
+ 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101,
255, 108, 111,
+ 0, 0, 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50,
32, 45, 32,
+ 78, 97, 116, 105, 118, 101, 32, 82, 117, 115, 116, 32, 105, 109,
112, 108,
+ 101, 109, 101, 110, 116, 97, 116, 105, 111, 110, 32, 111, 102, 32,
65, 114,
+ 114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49,
+ ];
+
+ let file = SliceableCursor::new(data);
+ let file_reader = SerializedFileReader::new(file).unwrap();
+ let mut arrow_reader =
ParquetFileArrowReader::new(Arc::new(file_reader));
+
+ let mut record_batch_reader = arrow_reader
+ .get_record_reader_by_columns(vec![0], 10)
+ .unwrap();
+
+ let error = record_batch_reader.next().unwrap().unwrap_err();
+
+ assert!(
+ error.to_string().contains("invalid utf-8 sequence"),
+ "{}",
+ error
+ );
+ }
}
diff --git a/parquet/src/arrow/record_reader.rs
b/parquet/src/arrow/record_reader.rs
index df93ebf..ce77db9 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -264,12 +264,12 @@ where
)
})?;
- let iter = def_levels.rev_valid_positions_iter(
- self.values_written..self.values_written + levels_read,
+ self.records.pad_nulls(
+ self.values_written,
+ values_read,
+ levels_read,
+ def_levels.rev_valid_positions_iter(),
);
-
- self.records
- .pad_nulls(self.values_written..self.values_written +
values_read, iter);
}
let values_read = max(levels_read, values_read);
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 29e6110..5c69dfa 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -16,9 +16,9 @@
// under the License.
use std::marker::PhantomData;
-use std::ops::Range;
use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::datatypes::ToByteSlice;
/// A buffer that supports writing new data to the end, and removing data from
the front
///
@@ -74,6 +74,7 @@ pub trait BufferQueue: Sized {
///
pub trait ScalarValue {}
impl ScalarValue for bool {}
+impl ScalarValue for u8 {}
impl ScalarValue for i16 {}
impl ScalarValue for i32 {}
impl ScalarValue for i64 {}
@@ -114,6 +115,10 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}
+ pub fn reserve(&mut self, additional: usize) {
+ self.buffer.reserve(additional * std::mem::size_of::<T>());
+ }
+
pub fn resize(&mut self, len: usize) {
self.buffer.resize(len * std::mem::size_of::<T>(), 0);
self.len = len;
@@ -133,14 +138,8 @@ impl<T: ScalarValue> ScalarBuffer<T> {
assert!(prefix.is_empty() && suffix.is_empty());
buf
}
-}
-impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
- type Output = Buffer;
-
- type Slice = [T];
-
- fn split_off(&mut self, len: usize) -> Self::Output {
+ pub fn take(&mut self, len: usize) -> Self {
assert!(len <= self.len);
let num_bytes = len * std::mem::size_of::<T>();
@@ -158,7 +157,39 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
self.buffer.resize(num_bytes, 0);
self.len -= len;
- std::mem::replace(&mut self.buffer, remaining).into()
+ Self {
+ buffer: std::mem::replace(&mut self.buffer, remaining),
+ len,
+ _phantom: Default::default(),
+ }
+ }
+}
+
+impl<T: ScalarValue + ToByteSlice> ScalarBuffer<T> {
+ pub fn push(&mut self, v: T) {
+ self.buffer.push(v);
+ self.len += 1;
+ }
+
+ pub fn extend_from_slice(&mut self, v: &[T]) {
+ self.buffer.extend_from_slice(v);
+ self.len += v.len();
+ }
+}
+
+impl<T: ScalarValue> From<ScalarBuffer<T>> for Buffer {
+ fn from(t: ScalarBuffer<T>) -> Self {
+ t.buffer.into()
+ }
+}
+
+impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
+ type Output = Buffer;
+
+ type Slice = [T];
+
+ fn split_off(&mut self, len: usize) -> Self::Output {
+ self.take(len).into()
}
fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
@@ -180,20 +211,34 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
/// A [`BufferQueue`] capable of storing column values
pub trait ValuesBuffer: BufferQueue {
- /// Iterate through the indexes in `range` in reverse order, moving the
value at each
- /// index to the next index returned by `rev_valid_position_iter`
+ ///
+ /// If a column contains nulls, more level data may be read than value
data, as null
+ /// values are not encoded. Therefore, first the levels data is read, the
null count
+ /// determined, and then the corresponding number of values read to a
[`ValuesBuffer`].
+ ///
+ /// It is then necessary to move this values data into positions that
correspond to
+ /// the non-null level positions. This is what this method does.
+ ///
+ /// It is provided with:
+ ///
+ /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding
from
+ /// - `values_read` - the number of values read
+ /// - `levels_read` - the number of levels read
+ /// - `rev_valid_position_iter` - a reverse iterator of the valid level
positions
///
/// It is required that:
///
- /// - `rev_valid_position_iter` has at least `range.end - range.start`
elements
+ /// - `rev_valid_position_iter` has at least `values_len` elements
/// - `rev_valid_position_iter` returns strictly monotonically decreasing
values
- /// - the `i`th index returned by `rev_valid_position_iter` is `>=
range.end - i - 1`
+ /// - `rev_valid_position_iter` returns values in the range
`read_offset..read_offset+levels_len`
///
/// Implementations may panic or otherwise misbehave if this is not the
case
///
fn pad_nulls(
&mut self,
- range: Range<usize>,
+ read_offset: usize,
+ values_read: usize,
+ levels_read: usize,
rev_valid_position_iter: impl Iterator<Item = usize>,
);
}
@@ -201,12 +246,16 @@ pub trait ValuesBuffer: BufferQueue {
impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
fn pad_nulls(
&mut self,
- range: Range<usize>,
+ read_offset: usize,
+ values_read: usize,
+ levels_read: usize,
rev_valid_position_iter: impl Iterator<Item = usize>,
) {
let slice = self.as_slice_mut();
+ assert!(slice.len() >= read_offset + levels_read);
- for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter)
{
+ let values_range = read_offset..read_offset + values_read;
+ for (value_pos, level_pos) in
values_range.rev().zip(rev_valid_position_iter) {
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 750e118..d53310e 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -126,12 +126,8 @@ impl DefinitionLevelBuffer {
Bitmap::from(std::mem::replace(old_builder, new_builder).finish())
}
- /// Returns an iterator of the valid positions in `range` in descending
order
- pub fn rev_valid_positions_iter(
- &self,
- range: Range<usize>,
- ) -> impl Iterator<Item = usize> + '_ {
- assert_eq!(range.start, self.len);
+ /// Returns an iterator of the valid positions in descending order
+ pub fn rev_valid_positions_iter(&self) -> impl Iterator<Item = usize> + '_
{
iter_set_bits_rev(self.nulls().as_slice())
}
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index fe0344f..1fc722f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -258,11 +258,15 @@ where
// At this point we have read values, definition and repetition
levels.
// If both definition and repetition levels are defined, their
counts
// should be equal. Values count is always less or equal to
definition levels.
- if num_def_levels != 0 && num_rep_levels != 0 {
- assert_eq!(
- num_def_levels, num_rep_levels,
- "Number of decoded rep / def levels did not match"
- );
+ if num_def_levels != 0
+ && num_rep_levels != 0
+ && num_rep_levels != num_def_levels
+ {
+ return Err(general_err!(
+ "inconsistent number of levels read - def: {}, rep: {}",
+ num_def_levels,
+ num_rep_levels
+ ));
}
// Note that if field is not required, but no definition levels
are provided,
@@ -275,6 +279,14 @@ where
.values_decoder
.read(values, values_read..values_read + values_to_read)?;
+ if num_def_levels != 0 && curr_values_read != num_def_levels -
null_count {
+ return Err(general_err!(
+ "insufficient values read from column - expected: {}, got:
{}",
+ num_def_levels - null_count,
+ curr_values_read
+ ));
+ }
+
// Update all "return" counters and internal state.
// This is to account for when def or rep levels are not provided
@@ -359,6 +371,7 @@ where
encoding,
buf.start_from(offset),
num_values as usize,
+ None,
)?;
return Ok(true);
}
@@ -367,13 +380,17 @@ where
buf,
num_values,
encoding,
- num_nulls: _,
+ num_nulls,
num_rows: _,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: _,
statistics: _,
} => {
+ if num_nulls > num_values {
+ return Err(general_err!("more nulls than
values in page, contained {} values and {} nulls", num_values, num_nulls));
+ }
+
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
@@ -408,6 +425,7 @@ where
(rep_levels_byte_len +
def_levels_byte_len) as usize,
),
num_values as usize,
+ Some((num_values - num_nulls) as usize),
)?;
return Ok(true);
}
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index a9221d9..76b0d07 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -95,11 +95,22 @@ pub trait ColumnValueDecoder {
) -> Result<()>;
/// Set the current data page
+ ///
+ /// - `encoding` - the encoding of the page
+ /// - `data` - a point to the page's uncompressed value data
+ /// - `num_levels` - the number of levels contained within the page, i.e.
values including nulls
+ /// - `num_values` - the number of non-null values contained within the
page (V2 page only)
+ ///
+ /// Note: data encoded with [`Encoding::RLE`] may not know its exact
length, as the final
+ /// run may be zero-padded. As such if `num_values` is not provided (i.e.
`None`),
+ /// subsequent calls to `ColumnValueDecoder::read` may yield more values
than
+ /// non-null definition levels within the page
fn set_data(
&mut self,
encoding: Encoding,
data: ByteBufferPtr,
- num_values: usize,
+ num_levels: usize,
+ num_values: Option<usize>,
) -> Result<()>;
/// Read values data into `out[range]` returning the number of values read
@@ -170,7 +181,8 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
&mut self,
mut encoding: Encoding,
data: ByteBufferPtr,
- num_values: usize,
+ num_levels: usize,
+ num_values: Option<usize>,
) -> Result<()> {
use std::collections::hash_map::Entry;
@@ -193,7 +205,7 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
}
};
- decoder.set_data(data, num_values)?;
+ decoder.set_data(data, num_values.unwrap_or(num_levels))?;
self.current_encoding = Some(encoding);
Ok(())
}