This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d801ac2 Preserve dictionary encoding when decoding parquet into Arrow
arrays, 60x perf improvement (#171) (#1180)
d801ac2 is described below
commit d801ac20311f3dfa905702aee752c8ca91be5297
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jan 24 12:00:44 2022 +0000
Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x
perf improvement (#171) (#1180)
* Preserve dictionary encoding from parquet (#171)
* Use OffsetBuffer::into_array for dictionary
* Fix and test handling of empty dictionaries
Don't panic if missing dictionary page
* Use ArrayRef instead of Arc<ArrayData>
* Update doc comments
* Add integration test
Tweak RecordReader buffering logic
* Add benchmark
* Set write batch size in parquet fuzz tests
Fix bug in column writer with small page sizes
* Fix test_dictionary_preservation
* Add batch_size comment
---
parquet/benches/arrow_array_reader.rs | 135 ++++-
parquet/src/arrow/array_reader.rs | 68 +--
parquet/src/arrow/array_reader/byte_array.rs | 106 ++--
.../arrow/array_reader/byte_array_dictionary.rs | 552 +++++++++++++++++++++
.../src/arrow/array_reader/dictionary_buffer.rs | 383 ++++++++++++++
parquet/src/arrow/array_reader/offset_buffer.rs | 63 ++-
parquet/src/arrow/array_reader/test_util.rs | 89 ++++
parquet/src/arrow/arrow_reader.rs | 243 +++++++--
parquet/src/arrow/bit_util.rs | 95 ++++
parquet/src/arrow/mod.rs | 1 +
parquet/src/arrow/record_reader.rs | 27 +-
parquet/src/arrow/record_reader/buffer.rs | 24 +-
.../src/arrow/record_reader/definition_levels.rs | 79 +--
parquet/src/column/writer.rs | 8 +
14 files changed, 1615 insertions(+), 258 deletions(-)
diff --git a/parquet/benches/arrow_array_reader.rs
b/parquet/benches/arrow_array_reader.rs
index 54f99c4..9db5b4c 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::Array;
+use arrow::datatypes::DataType;
use criterion::{criterion_group, criterion_main, Criterion};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl,
InMemoryPageIterator};
use parquet::{
@@ -24,6 +26,7 @@ use parquet::{
data_type::{ByteArrayType, Int32Type},
schema::types::{ColumnDescPtr, SchemaDescPtr},
};
+use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{collections::VecDeque, sync::Arc};
fn build_test_schema() -> SchemaDescPtr {
@@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;
-use arrow::array::Array;
-use rand::{rngs::StdRng, Rng, SeedableRng};
-
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
@@ -333,6 +333,46 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None,
true).unwrap()
}
+fn create_string_byte_array_dictionary_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+ use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
+ let arrow_type =
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
+
+ make_byte_array_dictionary_reader(
+ Box::new(page_iterator),
+ column_desc,
+ Some(arrow_type),
+ true,
+ )
+ .unwrap()
+}
+
+fn create_complex_object_byte_array_dictionary_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+ use parquet::arrow::array_reader::{
+ make_byte_array_dictionary_reader, ComplexObjectArrayReader,
+ };
+ use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+ let arrow_type =
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
+
+ let converter = Utf8Converter::new(Utf8ArrayConverter {});
+ Box::new(
+ ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
+ Box::new(page_iterator),
+ column_desc,
+ converter,
+ Some(arrow_type),
+ )
+ .unwrap(),
+ )
+}
+
fn add_benches(c: &mut Criterion) {
const EXPECTED_VALUE_COUNT: usize =
NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
@@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) {
let mandatory_int32_column_desc = schema.column(0);
let optional_int32_column_desc = schema.column(1);
let mandatory_string_column_desc = schema.column(2);
- // println!("mandatory_string_column_desc: {:?}",
mandatory_string_column_desc);
let optional_string_column_desc = schema.column(3);
- // println!("optional_string_column_desc: {:?}",
optional_string_column_desc);
-
// primitive / int32 benchmarks
// =============================
@@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) {
// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(
- schema,
+ schema.clone(),
optional_string_column_desc.clone(),
0.5,
);
@@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) {
},
);
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader =
create_complex_object_byte_array_dictionary_reader(
+ dictionary_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_byte_array_dictionary_reader(
+ dictionary_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, optional, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader =
create_complex_object_byte_array_dictionary_reader(
+ dictionary_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, optional, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_byte_array_dictionary_reader(
+ dictionary_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, optional, half NULLs -
old",
+ |b| {
+ b.iter(|| {
+ let array_reader =
create_complex_object_byte_array_dictionary_reader(
+ dictionary_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
+ group.bench_function(
+ "read StringDictionary, dictionary encoded, optional, half NULLs -
new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_byte_array_dictionary_reader(
+ dictionary_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ },
+ );
+
group.finish();
}
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index c11bfc2..01e54f6 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;
use crate::arrow::converter::{
- BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
- DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
- Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
- IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
- IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
+ Converter, DecimalArrayConverter, DecimalConverter,
FixedLenBinaryConverter,
+ FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
+ IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
+ IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
- BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType,
FloatType,
- Int32Type, Int64Type, Int96Type,
+ BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type,
+ Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
@@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;
mod byte_array;
+mod byte_array_dictionary;
+mod dictionary_buffer;
mod offset_buffer;
+#[cfg(test)]
+mod test_util;
+
pub use byte_array::make_byte_array_reader;
+pub use byte_array_dictionary::make_byte_array_dictionary_reader;
/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
@@ -271,7 +276,8 @@ where
.clone(),
};
- let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
+ let record_reader =
+ RecordReader::<T>::new_with_options(column_desc.clone(),
null_mask_only);
Ok(Self {
data_type,
@@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
- let struct_array = arr.as_any()
+ let struct_array = arr
+ .as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");
// Recursively call remove indices on each of the structs fields
- let new_columns = fields.into_iter()
+ let new_columns = fields
+ .into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
- Ok((field,
- remove_indices(column.clone(), dt, indices.clone())?))
+ Ok((field, remove_indices(column.clone(), dt,
indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;
@@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
- // TODO: Replace with optimised dictionary reader (#171)
- Some(ArrowType::Dictionary(_, _)) => {
- match cur_type.get_basic_info().converted_type() {
- ConvertedType::UTF8 => {
- let converter =
Utf8Converter::new(Utf8ArrayConverter {});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- Utf8Converter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- _ => {
- let converter =
BinaryConverter::new(BinaryArrayConverter {});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- BinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- }
- }
+ Some(ArrowType::Dictionary(_, _)) =>
make_byte_array_dictionary_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
_ => make_byte_array_reader(
page_iterator,
column_desc,
@@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
- use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
+ use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type,
Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index fc214dd..b3606a7 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -203,11 +203,12 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
}
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
- self.decoder.as_mut().expect("decoder set").read(
- out,
- range.end - range.start,
- self.dict.as_ref(),
- )
+ let decoder = self
+ .decoder
+ .as_mut()
+ .ok_or_else(|| general_err!("no decoder set"))?;
+
+ decoder.read(out, range.end - range.start, self.dict.as_ref())
}
}
@@ -266,7 +267,9 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.read(out, len),
ByteArrayDecoder::Dictionary(d) => {
- let dict = dict.expect("dictionary set");
+ let dict = dict
+ .ok_or_else(|| general_err!("missing dictionary page for
column"))?;
+
d.read(out, dict, len)
}
ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
@@ -546,6 +549,10 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
+ if dict.is_empty() {
+ return Ok(0); // All data must be NULL
+ }
+
let mut values_read = 0;
while values_read != len && self.max_remaining_values != 0 {
@@ -579,69 +586,16 @@ impl ByteArrayDecoderDictionary {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::arrow::array_reader::test_util::{byte_array_all_encodings,
utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
- use crate::basic::Type as PhysicalType;
- use crate::data_type::{ByteArray, ByteArrayType};
- use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
- use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
- use crate::util::memory::MemTracker;
use arrow::array::{Array, StringArray};
- use std::sync::Arc;
-
- fn column() -> ColumnDescPtr {
- let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
- .with_converted_type(ConvertedType::UTF8)
- .build()
- .unwrap();
-
- Arc::new(ColumnDescriptor::new(
- Arc::new(t),
- 1,
- 0,
- ColumnPath::new(vec![]),
- ))
- }
-
- fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
- let descriptor = column();
- let mem_tracker = Arc::new(MemTracker::new());
- let mut encoder =
- get_encoder::<ByteArrayType>(descriptor, encoding,
mem_tracker).unwrap();
-
- encoder.put(data).unwrap();
- encoder.flush_buffer().unwrap()
- }
#[test]
fn test_byte_array_decoder() {
- let data: Vec<_> = vec!["hello", "world", "a", "b"]
- .into_iter()
- .map(ByteArray::from)
- .collect();
-
- let mut dict_encoder =
- DictEncoder::<ByteArrayType>::new(column(),
Arc::new(MemTracker::new()));
-
- dict_encoder.put(&data).unwrap();
- let encoded_rle = dict_encoder.flush_buffer().unwrap();
- let encoded_dictionary = dict_encoder.write_dict().unwrap();
-
- // A column chunk with all the encodings!
- let pages = vec![
- (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
- (
- Encoding::DELTA_BYTE_ARRAY,
- get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
- ),
- (
- Encoding::DELTA_LENGTH_BYTE_ARRAY,
- get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
- ),
- (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
- (Encoding::RLE_DICTIONARY, encoded_rle),
- ];
+ let (pages, encoded_dictionary) =
+ byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
- let column_desc = column();
+ let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
decoder
@@ -668,15 +622,9 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
let valid = vec![false, false, true, true, false, true, true,
false, false];
- let rev_position_iter = valid
- .iter()
- .enumerate()
- .rev()
- .filter_map(|(i, valid)| valid.then(|| i));
-
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 4, valid.len(), rev_position_iter);
+ output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings =
array.as_any().downcast_ref::<StringArray>().unwrap();
@@ -696,4 +644,22 @@ mod tests {
);
}
}
+
+ #[test]
+ fn test_byte_array_decoder_nulls() {
+ let (pages, encoded_dictionary) =
byte_array_all_encodings(Vec::<&str>::new());
+
+ let column_desc = utf8_column();
+ let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+ decoder
+ .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+
+ for (encoding, page) in pages {
+ let mut output = OffsetBuffer::<i32>::default();
+ decoder.set_data(encoding, page, 4, None).unwrap();
+ assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+ }
+ }
}
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
new file mode 100644
index 0000000..eddb3a6
--- /dev/null
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -0,0 +1,552 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::marker::PhantomData;
+use std::ops::Range;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+
+use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
+use crate::arrow::array_reader::{
+ byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
+ offset_buffer::OffsetBuffer,
+};
+use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{ConvertedType, Encoding};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
+use crate::util::memory::ByteBufferPtr;
+
+/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
+macro_rules! make_reader {
+ (
+ ($pages:expr, $column_desc:expr, $data_type:expr,
$null_mask_only:expr) => match ($k:expr, $v:expr) {
+ $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty,
$value_type:ty),)+
+ }
+ ) => {
+ match (($k, $v)) {
+ $(
+ ($key_arrow, $value_arrow) => {
+ let reader = GenericRecordReader::new_with_options(
+ $column_desc,
+ $null_mask_only,
+ );
+ Ok(Box::new(ByteArrayDictionaryReader::<$key_type,
$value_type>::new(
+ $pages, $data_type, reader,
+ )))
+ }
+ )+
+ _ => Err(general_err!(
+ "unsupported data type for byte array dictionary reader - {}",
+ $data_type
+ )),
+ }
+ }
+}
+
+/// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// This will attempt to preserve any dictionary encoding present in the
parquet data
+///
+/// It will be unable to preserve the dictionary encoding if:
+///
+/// * A single read spans across multiple column chunks
+/// * A column chunk contains non-dictionary encoded pages
+///
+/// It is therefore recommended that if `pages` contains data from multiple
column chunks,
+/// that the read batch size used is a divisor of the row group size
+///
+pub fn make_byte_array_dictionary_reader(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ arrow_type: Option<ArrowType>,
+ null_mask_only: bool,
+) -> Result<Box<dyn ArrayReader>> {
+ // Check if Arrow type is specified, else create it from Parquet type
+ let data_type = match arrow_type {
+ Some(t) => t,
+ None => parquet_to_arrow_field(column_desc.as_ref())?
+ .data_type()
+ .clone(),
+ };
+
+ match &data_type {
+ ArrowType::Dictionary(key_type, value_type) => {
+ make_reader! {
+ (pages, column_desc, data_type, null_mask_only) => match
(key_type.as_ref(), value_type.as_ref()) {
+ (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) =>
(u8, i32),
+ (ArrowType::UInt8, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u8, i64),
+ (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) =>
(i8, i32),
+ (ArrowType::Int8, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (i8, i64),
+ (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8)
=> (u16, i32),
+ (ArrowType::UInt16, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u16, i64),
+ (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) =>
(i16, i32),
+ (ArrowType::Int16, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (i16, i64),
+ (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8)
=> (u32, i32),
+ (ArrowType::UInt32, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u32, i64),
+ (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) =>
(i32, i32),
+ (ArrowType::Int32, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (i32, i64),
+ (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8)
=> (u64, i32),
+ (ArrowType::UInt64, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u64, i64),
+ (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) =>
(i64, i32),
+ (ArrowType::Int64, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (i64, i64),
+ }
+ }
+ }
+ _ => Err(general_err!(
+ "invalid non-dictionary data type for byte array dictionary reader
- {}",
+ data_type
+ )),
+ }
+}
+
+/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
+///
+/// Will attempt to preserve any dictionary encoding present in the parquet
data
+struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
+ data_type: ArrowType,
+ pages: Box<dyn PageIterator>,
+ def_levels_buffer: Option<Buffer>,
+ rep_levels_buffer: Option<Buffer>,
+ record_reader: GenericRecordReader<DictionaryBuffer<K, V>,
DictionaryDecoder<K, V>>,
+}
+
+impl<K, V> ByteArrayDictionaryReader<K, V>
+where
+ K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+ V: ScalarValue + OffsetSizeTrait,
+{
+ fn new(
+ pages: Box<dyn PageIterator>,
+ data_type: ArrowType,
+ record_reader: GenericRecordReader<
+ DictionaryBuffer<K, V>,
+ DictionaryDecoder<K, V>,
+ >,
+ ) -> Self {
+ Self {
+ data_type,
+ pages,
+ def_levels_buffer: None,
+ rep_levels_buffer: None,
+ record_reader,
+ }
+ }
+}
+
+impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
+where
+ K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+ V: ScalarValue + OffsetSizeTrait,
+{
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ read_records(&mut self.record_reader, self.pages.as_mut(),
batch_size)?;
+ let buffer = self.record_reader.consume_record_data()?;
+ let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+ let array = buffer.into_array(null_buffer, &self.data_type)?;
+
+ self.def_levels_buffer = self.record_reader.consume_def_levels()?;
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ self.record_reader.reset();
+
+ Ok(array)
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_levels_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_levels_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+}
+
+/// If the data is dictionary encoded decode the key data directly, so that
the dictionary
+/// encoding can be preserved. Otherwise fallback to decoding using
[`ByteArrayDecoder`]
+/// and compute a fresh dictionary in [`ByteArrayDictionaryReader::next_batch`]
+enum MaybeDictionaryDecoder {
+ Dict {
+ decoder: RleDecoder,
+ /// This is a maximum as the null count is not always known, e.g.
value data from
+ /// a v1 data page
+ max_remaining_values: usize,
+ },
+ Fallback(ByteArrayDecoder),
+}
+
+/// A [`ColumnValueDecoder`] for dictionary encoded variable length byte arrays
+struct DictionaryDecoder<K, V> {
+ /// The current dictionary
+ dict: Option<ArrayRef>,
+
+ /// Dictionary decoder
+ decoder: Option<MaybeDictionaryDecoder>,
+
+ validate_utf8: bool,
+
+ value_type: ArrowType,
+
+ phantom: PhantomData<(K, V)>,
+}
+
+impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
+where
+ K: FromBytes + ScalarValue + Ord + ArrowNativeType,
+ V: ScalarValue + OffsetSizeTrait,
+{
+ type Slice = DictionaryBuffer<K, V>;
+
+ fn new(col: &ColumnDescPtr) -> Self {
+ let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
+
+ let value_type =
+ match (V::is_large(), col.converted_type() == ConvertedType::UTF8)
{
+ (true, true) => ArrowType::LargeUtf8,
+ (true, false) => ArrowType::LargeBinary,
+ (false, true) => ArrowType::Utf8,
+ (false, false) => ArrowType::Binary,
+ };
+
+ Self {
+ dict: None,
+ decoder: None,
+ validate_utf8,
+ value_type,
+ phantom: Default::default(),
+ }
+ }
+
+ fn set_dict(
+ &mut self,
+ buf: ByteBufferPtr,
+ num_values: u32,
+ encoding: Encoding,
+ _is_sorted: bool,
+ ) -> Result<()> {
+ if !matches!(
+ encoding,
+ Encoding::PLAIN | Encoding::RLE_DICTIONARY |
Encoding::PLAIN_DICTIONARY
+ ) {
+ return Err(nyi_err!(
+ "Invalid/Unsupported encoding type for dictionary: {}",
+ encoding
+ ));
+ }
+
+ if K::from_usize(num_values as usize).is_none() {
+ return Err(general_err!("dictionary too large for index type"));
+ }
+
+ let len = num_values as usize;
+ let mut buffer = OffsetBuffer::<V>::default();
+ let mut decoder =
+ ByteArrayDecoderPlain::new(buf, len, Some(len),
self.validate_utf8);
+ decoder.read(&mut buffer, usize::MAX)?;
+
+ let array = buffer.into_array(None, self.value_type.clone());
+ self.dict = Some(Arc::new(array));
+ Ok(())
+ }
+
+ fn set_data(
+ &mut self,
+ encoding: Encoding,
+ data: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ ) -> Result<()> {
+ let decoder = match encoding {
+ Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
+ let bit_width = data[0];
+ let mut decoder = RleDecoder::new(bit_width);
+ decoder.set_data(data.start_from(1));
+ MaybeDictionaryDecoder::Dict {
+ decoder,
+ max_remaining_values: num_values.unwrap_or(num_levels),
+ }
+ }
+ _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new(
+ encoding,
+ data,
+ num_levels,
+ num_values,
+ self.validate_utf8,
+ )?),
+ };
+
+ self.decoder = Some(decoder);
+ Ok(())
+ }
+
+ fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ match self.decoder.as_mut().expect("decoder set") {
+ MaybeDictionaryDecoder::Fallback(decoder) => {
+ decoder.read(out.spill_values()?, range.end - range.start,
None)
+ }
+ MaybeDictionaryDecoder::Dict {
+ decoder,
+ max_remaining_values,
+ } => {
+ let len = (range.end - range.start).min(*max_remaining_values);
+
+ let dict = self
+ .dict
+ .as_ref()
+ .ok_or_else(|| general_err!("missing dictionary page for
column"))?;
+
+ assert_eq!(dict.data_type(), &self.value_type);
+
+ if dict.is_empty() {
+ return Ok(0); // All data must be NULL
+ }
+
+ match out.as_keys(dict) {
+ Some(keys) => {
+ // Happy path - can just copy keys
+ // Keys will be validated on conversion to arrow
+ let keys_slice = keys.spare_capacity_mut(range.start +
len);
+ let len = decoder.get_batch(&mut
keys_slice[range.start..])?;
+ Ok(len)
+ }
+ None => {
+ // Sad path - need to recompute dictionary
+ //
+ // This either means we crossed into a new column
chunk whilst
+ // reading this batch, or encountered non-dictionary
encoded data
+ let values = out.spill_values()?;
+ let mut keys = vec![K::default(); len];
+ let len = decoder.get_batch(&mut keys)?;
+
+ assert_eq!(dict.data_type(), &self.value_type);
+
+ let dict_buffers = dict.data().buffers();
+ let dict_offsets = unsafe {
dict_buffers[0].typed_data::<V>() };
+ let dict_values = dict_buffers[1].as_slice();
+
+ values.extend_from_dictionary(
+ &keys[..len],
+ dict_offsets,
+ dict_values,
+ )?;
+
+ Ok(len)
+ }
+ }
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::array::{Array, StringArray};
+ use arrow::compute::cast;
+
+ use crate::arrow::array_reader::test_util::{
+ byte_array_all_encodings, encode_dictionary, utf8_column,
+ };
+ use crate::arrow::record_reader::buffer::ValuesBuffer;
+ use crate::data_type::ByteArray;
+
+ use super::*;
+
+ fn utf8_dictionary() -> ArrowType {
+ ArrowType::Dictionary(Box::new(ArrowType::Int32),
Box::new(ArrowType::Utf8))
+ }
+
+ #[test]
+ fn test_dictionary_preservation() {
+ let data_type = utf8_dictionary();
+
+ let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
+ .into_iter()
+ .map(ByteArray::from)
+ .collect();
+ let (dict, encoded) = encode_dictionary(&data);
+
+ let column_desc = utf8_column();
+ let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
+
+ decoder
+ .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+
+ decoder
+ .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
+ .unwrap();
+
+ let mut output = DictionaryBuffer::<i32, i32>::default();
+ assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
+
+ let mut valid = vec![false, false, true, true, false, true];
+ let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+ output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
+
+ assert!(matches!(output, DictionaryBuffer::Dict { .. }));
+
+ assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
+
+ valid.extend_from_slice(&[false, false, true, true, false, true, true,
false]);
+ let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+ output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
+
+ assert!(matches!(output, DictionaryBuffer::Dict { .. }));
+
+ let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
+ assert_eq!(array.data_type(), &data_type);
+
+ let array = cast(&array, &ArrowType::Utf8).unwrap();
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(strings.len(), 14);
+
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![
+ None,
+ None,
+ Some("0"),
+ Some("1"),
+ None,
+ Some("0"),
+ None,
+ None,
+ Some("1"),
+ Some("2"),
+ None,
+ Some("1"),
+ Some("2"),
+ None
+ ]
+ )
+ }
+
+ #[test]
+ fn test_dictionary_fallback() {
+ let data_type = utf8_dictionary();
+ let data = vec!["hello", "world", "a", "b"];
+
+ let (pages, encoded_dictionary) =
byte_array_all_encodings(data.clone());
+ let num_encodings = pages.len();
+
+ let column_desc = utf8_column();
+ let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
+
+ decoder
+ .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+
+ // Read all pages into single buffer
+ let mut output = DictionaryBuffer::<i32, i32>::default();
+
+ for (encoding, page) in pages {
+ decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+ assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
+ }
+ let array = output.into_array(None, &data_type).unwrap();
+ assert_eq!(array.data_type(), &data_type);
+
+ let array = cast(&array, &ArrowType::Utf8).unwrap();
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(strings.len(), data.len() * num_encodings);
+
+ // Should have a copy of `data` for each encoding
+ for i in 0..num_encodings {
+ assert_eq!(
+ strings
+ .iter()
+ .skip(i * data.len())
+ .take(data.len())
+ .map(|x| x.unwrap())
+ .collect::<Vec<_>>(),
+ data
+ )
+ }
+ }
+
+ #[test]
+ fn test_too_large_dictionary() {
+ let data: Vec<_> = (0..128)
+ .map(|x| ByteArray::from(x.to_string().as_str()))
+ .collect();
+ let (dictionary, _) = encode_dictionary(&data);
+
+ let column_desc = utf8_column();
+
+ let mut decoder = DictionaryDecoder::<i8, i32>::new(&column_desc);
+ let err = decoder
+ .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false)
+ .unwrap_err()
+ .to_string();
+
+ assert!(err.contains("dictionary too large for index type"));
+
+ let mut decoder = DictionaryDecoder::<i16, i32>::new(&column_desc);
+ decoder
+ .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+ }
+
+ #[test]
+ fn test_nulls() {
+ let data_type = utf8_dictionary();
+ let (pages, encoded_dictionary) =
byte_array_all_encodings(Vec::<&str>::new());
+
+ let column_desc = utf8_column();
+ let mut decoder = DictionaryDecoder::new(&column_desc);
+
+ decoder
+ .set_dict(encoded_dictionary, 4, Encoding::PLAIN_DICTIONARY, false)
+ .unwrap();
+
+ for (encoding, page) in pages {
+ let mut output = DictionaryBuffer::<i32, i32>::default();
+ decoder.set_data(encoding, page, 8, None).unwrap();
+ assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+
+ output.pad_nulls(0, 0, 8, &[0]);
+ let array = output
+ .into_array(Some(Buffer::from(&[0])), &data_type)
+ .unwrap();
+
+ assert_eq!(array.len(), 8);
+ assert_eq!(array.null_count(), 8);
+ }
+ }
+}
diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs
b/parquet/src/arrow/array_reader/dictionary_buffer.rs
new file mode 100644
index 0000000..6bb9603
--- /dev/null
+++ b/parquet/src/arrow/array_reader/dictionary_buffer.rs
@@ -0,0 +1,383 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::record_reader::buffer::{
+ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
+};
+use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::errors::{ParquetError, Result};
+use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
+use std::sync::Arc;
+
+/// An array of variable length byte arrays that are potentially dictionary
encoded
+/// and can be converted into a corresponding [`ArrayRef`]
+pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
+ Dict {
+ keys: ScalarBuffer<K>,
+ values: ArrayRef,
+ },
+ Values {
+ values: OffsetBuffer<V>,
+ },
+}
+
+impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+ fn default() -> Self {
+ Self::Values {
+ values: Default::default(),
+ }
+ }
+}
+
+impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
+ DictionaryBuffer<K, V>
+{
+ pub fn len(&self) -> usize {
+ match self {
+ Self::Dict { keys, .. } => keys.len(),
+ Self::Values { values } => values.len(),
+ }
+ }
+
+ /// Returns a mutable reference to a keys array
+ ///
+ /// Returns None if the dictionary needs to be recomputed
+ ///
+ /// # Panic
+ ///
+ /// Panics if the dictionary is too large for `K`
+ pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut
ScalarBuffer<K>> {
+ assert!(K::from_usize(dictionary.len()).is_some());
+
+ match self {
+ Self::Dict { keys, values } => {
+ // Need to discard fat pointer for equality check
+ // - https://stackoverflow.com/a/67114787
+ // - https://github.com/rust-lang/rust/issues/46139
+ let values_ptr = values.as_ref() as *const _ as *const ();
+ let dict_ptr = dictionary.as_ref() as *const _ as *const ();
+ if values_ptr == dict_ptr {
+ Some(keys)
+ } else if keys.is_empty() {
+ *values = Arc::clone(dictionary);
+ Some(keys)
+ } else {
+ None
+ }
+ }
+ Self::Values { values } if values.is_empty() => {
+ *self = Self::Dict {
+ keys: Default::default(),
+ values: Arc::clone(dictionary),
+ };
+ match self {
+ Self::Dict { keys, .. } => Some(keys),
+ _ => unreachable!(),
+ }
+ }
+ _ => None,
+ }
+ }
+
+ /// Returns a mutable reference to a values array
+ ///
+ /// If this is currently dictionary encoded, this will convert from the
+ /// dictionary encoded representation
+ pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
+ match self {
+ Self::Values { values } => Ok(values),
+ Self::Dict { keys, values } => {
+ let mut spilled = OffsetBuffer::default();
+ let dict_buffers = values.data().buffers();
+ let dict_offsets = unsafe { dict_buffers[0].typed_data::<V>()
};
+ let dict_values = dict_buffers[1].as_slice();
+
+ if values.is_empty() {
+ // If dictionary is empty, zero pad offsets
+ spilled.offsets.resize(keys.len() + 1);
+ } else {
+ // Note: at this point null positions will have arbitrary
dictionary keys
+ // and this will hydrate them to the corresponding byte
array. This is
+ // likely sub-optimal, as we would prefer zero length null
"slots", but
+ // spilling is already a degenerate case and so it is
unclear if this is
+ // worth optimising for, e.g. by keeping a null mask around
+ spilled.extend_from_dictionary(
+ keys.as_slice(),
+ dict_offsets,
+ dict_values,
+ )?;
+ }
+
+ *self = Self::Values { values: spilled };
+ match self {
+ Self::Values { values } => Ok(values),
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+
+ /// Converts this into an [`ArrayRef`] with the provided `data_type` and
`null_buffer`
+ pub fn into_array(
+ self,
+ null_buffer: Option<Buffer>,
+ data_type: &ArrowType,
+ ) -> Result<ArrayRef> {
+ assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
+
+ match self {
+ Self::Dict { keys, values } => {
+ // Validate keys unless dictionary is empty
+ if !values.is_empty() {
+ let min = K::from_usize(0).unwrap();
+ let max = K::from_usize(values.len()).unwrap();
+
+ // It may be possible to use SIMD here
+ if keys.as_slice().iter().any(|x| *x < min || *x >= max) {
+ return Err(general_err!(
+ "dictionary key beyond bounds of dictionary:
0..{}",
+ values.len()
+ ));
+ }
+ }
+
+ let mut builder = ArrayDataBuilder::new(data_type.clone())
+ .len(keys.len())
+ .add_buffer(keys.into())
+ .add_child_data(values.data().clone());
+
+ if let Some(buffer) = null_buffer {
+ builder = builder.null_bit_buffer(buffer);
+ }
+
+ let data = match cfg!(debug_assertions) {
+ true => builder.build().unwrap(),
+ false => unsafe { builder.build_unchecked() },
+ };
+
+ Ok(make_array(data))
+ }
+ Self::Values { values } => {
+ let value_type = match data_type {
+ ArrowType::Dictionary(_, v) => v.as_ref().clone(),
+ _ => unreachable!(),
+ };
+
+ // This will compute a new dictionary
+ let array = arrow::compute::cast(
+ &values.into_array(null_buffer, value_type),
+ data_type,
+ )
+ .expect("cast should be infallible");
+
+ Ok(array)
+ }
+ }
+ }
+}
+
+impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K,
V> {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer
+ for DictionaryBuffer<K, V>
+{
+ fn pad_nulls(
+ &mut self,
+ read_offset: usize,
+ values_read: usize,
+ levels_read: usize,
+ valid_mask: &[u8],
+ ) {
+ match self {
+ Self::Dict { keys, .. } => {
+ keys.resize(read_offset + levels_read);
+ keys.pad_nulls(read_offset, values_read, levels_read,
valid_mask)
+ }
+ Self::Values { values, .. } => {
+ values.pad_nulls(read_offset, values_read, levels_read,
valid_mask)
+ }
+ }
+ }
+}
+
+impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
+ for DictionaryBuffer<K, V>
+{
+ type Output = Self;
+ type Slice = Self;
+
+ fn split_off(&mut self, len: usize) -> Self::Output {
+ match self {
+ Self::Dict { keys, values } => Self::Dict {
+ keys: keys.take(len),
+ values: values.clone(),
+ },
+ Self::Values { values } => Self::Values {
+ values: values.split_off(len),
+ },
+ }
+ }
+
+ fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ self
+ }
+
+ fn set_len(&mut self, len: usize) {
+ match self {
+ Self::Dict { keys, .. } => keys.set_len(len),
+ Self::Values { values } => values.set_len(len),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Array, StringArray};
+ use arrow::compute::cast;
+
+ #[test]
+ fn test_dictionary_buffer() {
+ let dict_type =
+ ArrowType::Dictionary(Box::new(ArrowType::Int32),
Box::new(ArrowType::Utf8));
+
+ let d1: ArrayRef =
+ Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
+
+ let mut buffer = DictionaryBuffer::<i32, i32>::default();
+
+ // Read some data preserving the dictionary
+ let values = &[1, 0, 3, 2, 4];
+ buffer.as_keys(&d1).unwrap().extend_from_slice(values);
+
+ let mut valid = vec![false, false, true, true, false, true, true,
true];
+ let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+ buffer.pad_nulls(0, values.len(), valid.len(),
valid_buffer.as_slice());
+
+ // Split off some data
+
+ let split = buffer.split_off(4);
+ let null_buffer = Buffer::from_iter(valid.drain(0..4));
+ let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+ assert_eq!(array.data_type(), &dict_type);
+
+ let strings = cast(&array, &ArrowType::Utf8).unwrap();
+ let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![None, None, Some("world"), Some("hello")]
+ );
+
+ // Read some data not preserving the dictionary
+
+ let values = buffer.spill_values().unwrap();
+ let read_offset = values.len();
+ values.try_push("bingo".as_bytes(), false).unwrap();
+ values.try_push("bongo".as_bytes(), false).unwrap();
+
+ valid.extend_from_slice(&[false, false, true, false, true]);
+ let null_buffer = Buffer::from_iter(valid.iter().cloned());
+ buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
+
+ assert_eq!(buffer.len(), 9);
+ let split = buffer.split_off(9);
+
+ let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
+ assert_eq!(array.data_type(), &dict_type);
+
+ let strings = cast(&array, &ArrowType::Utf8).unwrap();
+ let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![
+ None,
+ Some("a"),
+ Some(""),
+ Some("b"),
+ None,
+ None,
+ Some("bingo"),
+ None,
+ Some("bongo")
+ ]
+ );
+
+ // Can recreate with new dictionary as values is empty
+ assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
+ assert_eq!(buffer.len(), 0);
+ let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
+ buffer
+ .as_keys(&d2)
+ .unwrap()
+ .extend_from_slice(&[0, 1, 0, 1]);
+
+ let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+ assert_eq!(array.data_type(), &dict_type);
+
+ let strings = cast(&array, &ArrowType::Utf8).unwrap();
+ let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
+ );
+
+ // Can recreate with new dictionary as keys empty
+ assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+ assert_eq!(buffer.len(), 0);
+ let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
+ buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
+
+ // Cannot change dictionary as keys not empty
+ let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
+ assert!(buffer.as_keys(&d4).is_none());
+ }
+
+ #[test]
+ fn test_validates_keys() {
+ let dict_type =
+ ArrowType::Dictionary(Box::new(ArrowType::Int32),
Box::new(ArrowType::Utf8));
+
+ let mut buffer = DictionaryBuffer::<i32, i32>::default();
+ let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
+ buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
+
+ let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
+ assert!(
+ err.contains("dictionary key beyond bounds of dictionary: 0..2"),
+ "{}",
+ err
+ );
+
+ let mut buffer = DictionaryBuffer::<i32, i32>::default();
+ let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
+ buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
+
+ let err = buffer.spill_values().unwrap_err().to_string();
+ assert!(
+ err.contains("dictionary key beyond bounds of dictionary: 0..1"),
+ "{}",
+ err
+ );
+ }
+}
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs
b/parquet/src/arrow/array_reader/offset_buffer.rs
index ddf2ad3..dc35f95 100644
--- a/parquet/src/arrow/array_reader/offset_buffer.rs
+++ b/parquet/src/arrow/array_reader/offset_buffer.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::arrow::bit_util::iter_set_bits_rev;
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
@@ -26,6 +27,7 @@ use arrow::datatypes::{ArrowNativeType, DataType as
ArrowType};
/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
+#[derive(Debug)]
pub struct OffsetBuffer<I: ScalarValue> {
pub offsets: ScalarBuffer<I>,
pub values: ScalarBuffer<u8>,
@@ -48,6 +50,10 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
self.offsets.len() - 1
}
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
/// If `validate_utf8` this verifies that the first character of `data` is
/// the start of a UTF-8 codepoint
///
@@ -80,6 +86,8 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
///
/// For each value `key` in `keys` this will insert
/// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
+ ///
+ /// Note: This will validate offsets are valid
pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
&mut self,
keys: &[K],
@@ -89,7 +97,10 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
for key in keys {
let index = key.to_usize().unwrap();
if index + 1 >= dict_offsets.len() {
- return Err(general_err!("invalid offset in byte array: {}",
index));
+ return Err(general_err!(
+ "dictionary key beyond bounds of dictionary: 0..{}",
+ dict_offsets.len().saturating_sub(1)
+ ));
}
let start_offset = dict_offsets[index].to_usize().unwrap();
let end_offset = dict_offsets[index + 1].to_usize().unwrap();
@@ -178,7 +189,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for
OffsetBuffer<I> {
read_offset: usize,
values_read: usize,
levels_read: usize,
- rev_position_iter: impl Iterator<Item = usize>,
+ valid_mask: &[u8],
) {
assert_eq!(self.offsets.len(), read_offset + values_read + 1);
self.offsets.resize(read_offset + levels_read + 1);
@@ -189,7 +200,11 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for
OffsetBuffer<I> {
let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
let values_range = read_offset..read_offset + values_read;
- for (value_pos, level_pos) in
values_range.clone().rev().zip(rev_position_iter) {
+ for (value_pos, level_pos) in values_range
+ .clone()
+ .rev()
+ .zip(iter_set_bits_rev(valid_mask))
+ {
assert!(level_pos >= value_pos);
assert!(level_pos < last_pos);
@@ -280,19 +295,36 @@ mod tests {
#[test]
fn test_offset_buffer_pad_nulls() {
let mut buffer = OffsetBuffer::<i32>::default();
- for v in ["a", "b", "c", "def", "gh"] {
+ let values = ["a", "b", "c", "def", "gh"];
+ for v in &values {
buffer.try_push(v.as_bytes(), false).unwrap()
}
+ let valid = vec![
+ true, false, false, true, false, true, false, true, true, false,
false,
+ ];
+ let valid_mask = Buffer::from_iter(valid.iter().cloned());
+
// Both trailing and leading nulls
- buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter());
+ buffer.pad_nulls(1, values.len() - 1, valid.len() - 1,
valid_mask.as_slice());
- // No null buffer - nulls -> ""
- let array = buffer.into_array(None, ArrowType::Utf8);
+ let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
- strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
- vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""]
+ strings.iter().collect::<Vec<_>>(),
+ vec![
+ Some("a"),
+ None,
+ None,
+ Some("b"),
+ None,
+ Some("c"),
+ None,
+ Some("def"),
+ Some("gh"),
+ None,
+ None
+ ]
);
}
@@ -335,4 +367,17 @@ mod tests {
// Fails if run from middle of codepoint
buffer.check_valid_utf8(12).unwrap_err();
}
+
+ #[test]
+ fn test_pad_nulls_empty() {
+ let mut buffer = OffsetBuffer::<i32>::default();
+ let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
+ buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
+
+ let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(strings.len(), 9);
+ assert!(strings.iter().all(|x| x.is_none()))
+ }
}
diff --git a/parquet/src/arrow/array_reader/test_util.rs
b/parquet/src/arrow/array_reader/test_util.rs
new file mode 100644
index 0000000..b04a597
--- /dev/null
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::basic::{ConvertedType, Encoding, Type as PhysicalType};
+use crate::data_type::{ByteArray, ByteArrayType};
+use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
+use crate::util::memory::{ByteBufferPtr, MemTracker};
+
+/// Returns a descriptor for a UTF-8 column
+pub fn utf8_column() -> ColumnDescPtr {
+ let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
+ .with_converted_type(ConvertedType::UTF8)
+ .build()
+ .unwrap();
+
+ Arc::new(ColumnDescriptor::new(
+ Arc::new(t),
+ 1,
+ 0,
+ ColumnPath::new(vec![]),
+ ))
+}
+
+/// Encode `data` with the provided `encoding`
+pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) ->
ByteBufferPtr {
+ let descriptor = utf8_column();
+ let mem_tracker = Arc::new(MemTracker::new());
+ let mut encoder =
+ get_encoder::<ByteArrayType>(descriptor, encoding,
mem_tracker).unwrap();
+
+ encoder.put(data).unwrap();
+ encoder.flush_buffer().unwrap()
+}
+
+/// Returns the encoded dictionary and value data
+pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr)
{
+ let mut dict_encoder =
+ DictEncoder::<ByteArrayType>::new(utf8_column(),
Arc::new(MemTracker::new()));
+
+ dict_encoder.put(data).unwrap();
+ let encoded_rle = dict_encoder.flush_buffer().unwrap();
+ let encoded_dictionary = dict_encoder.write_dict().unwrap();
+
+ (encoded_dictionary, encoded_rle)
+}
+
+/// Encodes `data` in all the possible encodings
+///
+/// Returns an array of data with its associated encoding, along with an
encoded dictionary
+pub fn byte_array_all_encodings(
+ data: Vec<impl Into<ByteArray>>,
+) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) {
+ let data: Vec<_> = data.into_iter().map(Into::into).collect();
+ let (encoded_dictionary, encoded_rle) = encode_dictionary(&data);
+
+ // A column chunk with all the encodings!
+ let pages = vec![
+ (Encoding::PLAIN, encode_byte_array(Encoding::PLAIN, &data)),
+ (
+ Encoding::DELTA_BYTE_ARRAY,
+ encode_byte_array(Encoding::DELTA_BYTE_ARRAY, &data),
+ ),
+ (
+ Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ encode_byte_array(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
+ ),
+ (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
+ (Encoding::RLE_DICTIONARY, encoded_rle),
+ ];
+
+ (pages, encoded_dictionary)
+}
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 476cf08..259a3c0 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -17,6 +17,13 @@
//! Contains reader which reads parquet data into arrow array.
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::{array::StructArray, error::ArrowError};
+
use crate::arrow::array_reader::{build_array_reader, ArrayReader,
StructArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::{
@@ -25,11 +32,6 @@ use crate::arrow::schema::{
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::FileReader;
-use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
-use arrow::{array::StructArray, error::ArrowError};
-use std::sync::Arc;
/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read
parquet data
@@ -233,13 +235,29 @@ impl ParquetRecordBatchReader {
#[cfg(test)]
mod tests {
+ use std::cmp::min;
+ use std::convert::TryFrom;
+ use std::fs::File;
+ use std::io::Seek;
+ use std::path::PathBuf;
+ use std::sync::Arc;
+
+ use rand::{thread_rng, RngCore};
+ use serde_json::json;
+ use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
+
+ use arrow::array::*;
+ use arrow::datatypes::{DataType as ArrowDataType, Field};
+ use arrow::error::Result as ArrowResult;
+ use arrow::record_batch::{RecordBatch, RecordBatchReader};
+
use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter,
FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter,
Utf8ArrayConverter,
};
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
- use crate::basic::{ConvertedType, Encoding, Repetition};
+ use crate::basic::{ConvertedType, Encoding, Repetition, Type as
PhysicalType};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
@@ -253,18 +271,6 @@ mod tests {
use crate::schema::types::{Type, TypePtr};
use crate::util::cursor::SliceableCursor;
use crate::util::test_common::RandGen;
- use arrow::array::*;
- use arrow::datatypes::{DataType as ArrowDataType, Field};
- use arrow::record_batch::RecordBatchReader;
- use rand::{thread_rng, RngCore};
- use serde_json::json;
- use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
- use std::cmp::min;
- use std::convert::TryFrom;
- use std::fs::File;
- use std::io::Seek;
- use std::path::PathBuf;
- use std::sync::Arc;
#[test]
fn test_arrow_reader_all_columns() {
@@ -423,13 +429,13 @@ mod tests {
RandUtf8Gen,
>(2, ConvertedType::NONE, None, &converter, encodings);
- let converter = Utf8ArrayConverter {};
+ let utf8_converter = Utf8ArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
- >(2, ConvertedType::UTF8, None, &converter, encodings);
+ >(2, ConvertedType::UTF8, None, &utf8_converter, encodings);
run_single_column_reader_tests::<
ByteArrayType,
@@ -440,27 +446,11 @@ mod tests {
2,
ConvertedType::UTF8,
Some(ArrowDataType::Utf8),
- &converter,
- encodings,
- );
-
- run_single_column_reader_tests::<
- ByteArrayType,
- StringArray,
- Utf8ArrayConverter,
- RandUtf8Gen,
- >(
- 2,
- ConvertedType::UTF8,
- Some(ArrowDataType::Dictionary(
- Box::new(ArrowDataType::Int32),
- Box::new(ArrowDataType::Utf8),
- )),
- &converter,
+ &utf8_converter,
encodings,
);
- let converter = LargeUtf8ArrayConverter {};
+ let large_utf8_converter = LargeUtf8ArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
LargeStringArray,
@@ -470,9 +460,78 @@ mod tests {
2,
ConvertedType::UTF8,
Some(ArrowDataType::LargeUtf8),
- &converter,
+ &large_utf8_converter,
encodings,
);
+
+ let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
+ for key in &small_key_types {
+ for encoding in encodings {
+ let mut opts = TestOptions::new(2, 20,
15).with_null_percent(50);
+ opts.encoding = *encoding;
+
+ // Cannot run full test suite as keys overflow, run small test
instead
+ single_column_reader_test::<
+ ByteArrayType,
+ StringArray,
+ Utf8ArrayConverter,
+ RandUtf8Gen,
+ >(
+ opts,
+ 2,
+ ConvertedType::UTF8,
+ Some(ArrowDataType::Dictionary(
+ Box::new(key.clone()),
+ Box::new(ArrowDataType::Utf8),
+ )),
+ &utf8_converter,
+ );
+ }
+ }
+
+ let key_types = [
+ ArrowDataType::Int16,
+ ArrowDataType::UInt16,
+ ArrowDataType::Int32,
+ ArrowDataType::UInt32,
+ ArrowDataType::Int64,
+ ArrowDataType::UInt64,
+ ];
+
+ for key in &key_types {
+ run_single_column_reader_tests::<
+ ByteArrayType,
+ StringArray,
+ Utf8ArrayConverter,
+ RandUtf8Gen,
+ >(
+ 2,
+ ConvertedType::UTF8,
+ Some(ArrowDataType::Dictionary(
+ Box::new(key.clone()),
+ Box::new(ArrowDataType::Utf8),
+ )),
+ &utf8_converter,
+ encodings,
+ );
+
+ // https://github.com/apache/arrow-rs/issues/1179
+ // run_single_column_reader_tests::<
+ // ByteArrayType,
+ // LargeStringArray,
+ // LargeUtf8ArrayConverter,
+ // RandUtf8Gen,
+ // >(
+ // 2,
+ // ConvertedType::UTF8,
+ // Some(ArrowDataType::Dictionary(
+ // Box::new(key.clone()),
+ // Box::new(ArrowDataType::LargeUtf8),
+ // )),
+ // &large_utf8_converter,
+ // encodings
+ // );
+ }
}
#[test]
@@ -519,6 +578,11 @@ mod tests {
record_batch_size: usize,
/// Percentage of nulls in column or None if required
null_percent: Option<usize>,
+ /// Set write batch size
+ ///
+ /// This is the number of rows that are written at once to a page and
+ /// therefore acts as a bound on the page granularity of a row group
+ write_batch_size: usize,
/// Maximum size of page in bytes
max_data_page_size: usize,
/// Maximum size of dictionary page in bytes
@@ -536,6 +600,7 @@ mod tests {
num_rows: 100,
record_batch_size: 15,
null_percent: None,
+ write_batch_size: 64,
max_data_page_size: 1024 * 1024,
max_dict_page_size: 1024 * 1024,
writer_version: WriterVersion::PARQUET_1_0,
@@ -578,6 +643,7 @@ mod tests {
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
+ .set_write_batch_size(self.write_batch_size)
.set_writer_version(self.writer_version);
let builder = match self.encoding {
@@ -792,7 +858,7 @@ mod tests {
}
}
assert_eq!(a.data_type(), b.data_type());
- assert_eq!(a.data(), b.data());
+ assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(),
b.data());
total_read = end;
} else {
@@ -1005,4 +1071,101 @@ mod tests {
error
);
}
+
+ #[test]
+ fn test_dictionary_preservation() {
+ let mut fields = vec![Arc::new(
+ Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
+ .with_repetition(Repetition::OPTIONAL)
+ .with_converted_type(ConvertedType::UTF8)
+ .build()
+ .unwrap(),
+ )];
+
+ let schema = Arc::new(
+ Type::group_type_builder("test_schema")
+ .with_fields(&mut fields)
+ .build()
+ .unwrap(),
+ );
+
+ let dict_type = ArrowDataType::Dictionary(
+ Box::new(ArrowDataType::Int32),
+ Box::new(ArrowDataType::Utf8),
+ );
+
+ let arrow_field = Field::new("leaf", dict_type, true);
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ let values = vec![
+ vec![
+ ByteArray::from("hello"),
+ ByteArray::from("a"),
+ ByteArray::from("b"),
+ ByteArray::from("d"),
+ ],
+ vec![
+ ByteArray::from("c"),
+ ByteArray::from("a"),
+ ByteArray::from("b"),
+ ],
+ ];
+
+ let def_levels = vec![
+ vec![1, 0, 0, 1, 0, 0, 1, 1],
+ vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
+ ];
+
+ let opts = TestOptions {
+ encoding: Encoding::RLE_DICTIONARY,
+ ..Default::default()
+ };
+
+ generate_single_column_file_with_data::<ByteArrayType>(
+ &values,
+ Some(&def_levels),
+ file.try_clone().unwrap(), // Cannot use &mut File (#1163)
+ schema,
+ Some(arrow_field),
+ &opts,
+ )
+ .unwrap();
+
+ file.rewind().unwrap();
+
+ let parquet_reader = SerializedFileReader::try_from(file).unwrap();
+ let mut arrow_reader =
ParquetFileArrowReader::new(Arc::new(parquet_reader));
+
+ let record_reader = arrow_reader.get_record_reader(3).unwrap();
+
+ let batches = record_reader
+ .collect::<ArrowResult<Vec<RecordBatch>>>()
+ .unwrap();
+
+ assert_eq!(batches.len(), 6);
+ assert!(batches.iter().all(|x| x.num_columns() == 1));
+
+ let row_counts = batches
+ .iter()
+ .map(|x| (x.num_rows(), x.column(0).null_count()))
+ .collect::<Vec<_>>();
+
+ assert_eq!(
+ row_counts,
+ vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
+ );
+
+ let get_dict =
+ |batch: &RecordBatch|
batch.column(0).data().child_data()[0].clone();
+
+ // First and second batch in same row group -> same dictionary
+ assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
+ // Third batch spans row group -> computed dictionary
+ assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
+ assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
+ // Fourth, fifth and sixth from same row group -> same dictionary
+ assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
+ assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
+ }
}
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs
new file mode 100644
index 0000000..881c67d
--- /dev/null
+++ b/parquet/src/arrow/bit_util.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::bit_chunk_iterator::BitChunks;
+use std::ops::Range;
+
+/// Counts the number of set bits in the provided range
+pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
+ let mut count = 0_usize;
+ let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
+ chunks.iter().for_each(|chunk| {
+ count += chunk.count_ones() as usize;
+ });
+ count += chunks.remainder_bits().count_ones() as usize;
+ count
+}
+
+/// Iterates through the set bit positions in `bytes` in reverse order
+pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
+ let (mut byte_idx, mut in_progress) = match bytes.len() {
+ 0 => (0, 0),
+ len => (len - 1, bytes[len - 1]),
+ };
+
+ std::iter::from_fn(move || loop {
+ if in_progress != 0 {
+ let bit_pos = 7 - in_progress.leading_zeros();
+ in_progress ^= 1 << bit_pos;
+ return Some((byte_idx << 3) + (bit_pos as usize));
+ }
+
+ if byte_idx == 0 {
+ return None;
+ }
+
+ byte_idx -= 1;
+ in_progress = bytes[byte_idx];
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::BooleanBufferBuilder;
+ use rand::prelude::*;
+
+ #[test]
+ fn test_bit_fns() {
+ let mut rng = thread_rng();
+ let mask_length = rng.gen_range(1..20);
+ let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 ==
0))
+ .take(mask_length)
+ .collect();
+
+ let mut nulls = BooleanBufferBuilder::new(mask_length);
+ bools.iter().for_each(|b| nulls.append(*b));
+
+ let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
+ let expected: Vec<_> = bools
+ .iter()
+ .enumerate()
+ .rev()
+ .filter_map(|(x, y)| y.then(|| x))
+ .collect();
+ assert_eq!(actual, expected);
+
+ assert_eq!(iter_set_bits_rev(&[]).count(), 0);
+ assert_eq!(count_set_bits(&[], 0..0), 0);
+ assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
+
+ for _ in 0..20 {
+ let start = rng.gen_range(0..bools.len());
+ let end = rng.gen_range(start..bools.len());
+
+ let actual = count_set_bits(nulls.as_slice(), start..end);
+ let expected = bools[start..end].iter().filter(|x| **x).count();
+
+ assert_eq!(actual, expected);
+ }
+ }
+}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 57ad5b1..fbbf655 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -122,6 +122,7 @@ experimental_mod!(array_reader);
experimental_mod!(arrow_array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
+mod bit_util;
experimental_mod!(converter);
pub(in crate::arrow) mod levels;
pub(in crate::arrow) mod record_reader;
diff --git a/parquet/src/arrow/record_reader.rs
b/parquet/src/arrow/record_reader.rs
index ce77db9..31138c1 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -167,7 +167,30 @@ where
break;
}
- let batch_size = max(num_records - records_read, MIN_BATCH_SIZE);
+ // If repetition levels present, we don't know how much more to
read
+ // in order to read the requested number of records, therefore
read at least
+ // MIN_BATCH_SIZE, otherwise read **exactly** what was requested.
This helps
+ // to avoid a degenerate case where the buffers are never fully
drained.
+ //
+ // Consider the scenario where the user is requesting batches of
MIN_BATCH_SIZE.
+ //
+ // When transitioning across a row group boundary, this will read
some remainder
+ // from the row group `r`, before reading MIN_BATCH_SIZE from the
next row group,
+ // leaving `MIN_BATCH_SIZE + r` in the buffer.
+ //
+ // The client will then only split off the `MIN_BATCH_SIZE` they
actually wanted,
+ // leaving behind `r`. This will continue indefinitely.
+ //
+ // Aside from wasting cycles splitting and shuffling buffers
unnecessarily, this
+ // prevents dictionary preservation from functioning correctly as
the buffer
+ // will never be emptied, allowing a new dictionary to be
registered.
+ //
+ // This degenerate case can still occur for repeated fields, but
+ // it is avoided for the more common case of a non-repeated field
+ let batch_size = match &self.rep_levels {
+ Some(_) => max(num_records - records_read, MIN_BATCH_SIZE),
+ None => num_records - records_read,
+ };
// Try to more value from parquet pages
let values_read = self.read_one_batch(batch_size)?;
@@ -268,7 +291,7 @@ where
self.values_written,
values_read,
levels_read,
- def_levels.rev_valid_positions_iter(),
+ def_levels.nulls().as_slice(),
);
}
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 5c69dfa..3460d11 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,6 +17,7 @@
use std::marker::PhantomData;
+use crate::arrow::bit_util::iter_set_bits_rev;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::ToByteSlice;
@@ -75,13 +76,18 @@ pub trait BufferQueue: Sized {
pub trait ScalarValue {}
impl ScalarValue for bool {}
impl ScalarValue for u8 {}
+impl ScalarValue for i8 {}
+impl ScalarValue for u16 {}
impl ScalarValue for i16 {}
+impl ScalarValue for u32 {}
impl ScalarValue for i32 {}
+impl ScalarValue for u64 {}
impl ScalarValue for i64 {}
impl ScalarValue for f32 {}
impl ScalarValue for f64 {}
/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for
storage
+#[derive(Debug)]
pub struct ScalarBuffer<T: ScalarValue> {
buffer: MutableBuffer,
@@ -224,22 +230,14 @@ pub trait ValuesBuffer: BufferQueue {
/// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding
from
/// - `values_read` - the number of values read
/// - `levels_read` - the number of levels read
- /// - `rev_valid_position_iter` - a reverse iterator of the valid level
positions
- ///
- /// It is required that:
- ///
- /// - `rev_valid_position_iter` has at least `values_len` elements
- /// - `rev_valid_position_iter` returns strictly monotonically decreasing
values
- /// - `rev_valid_position_iter` returns values in the range
`read_offset..read_offset+levels_len`
- ///
- /// Implementations may panic or otherwise misbehave if this is not the
case
+ /// - `valid_mask` - a packed mask of valid levels
///
fn pad_nulls(
&mut self,
read_offset: usize,
values_read: usize,
levels_read: usize,
- rev_valid_position_iter: impl Iterator<Item = usize>,
+ valid_mask: &[u8],
);
}
@@ -249,13 +247,15 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
read_offset: usize,
values_read: usize,
levels_read: usize,
- rev_valid_position_iter: impl Iterator<Item = usize>,
+ valid_mask: &[u8],
) {
let slice = self.as_slice_mut();
assert!(slice.len() >= read_offset + levels_read);
let values_range = read_offset..read_offset + values_read;
- for (value_pos, level_pos) in
values_range.rev().zip(rev_valid_position_iter) {
+ for (value_pos, level_pos) in
+ values_range.rev().zip(iter_set_bits_rev(valid_mask))
+ {
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index d53310e..bc0de14 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -20,8 +20,8 @@ use std::ops::Range;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
-use arrow::util::bit_chunk_iterator::BitChunks;
+use crate::arrow::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
@@ -126,12 +126,7 @@ impl DefinitionLevelBuffer {
Bitmap::from(std::mem::replace(old_builder, new_builder).finish())
}
- /// Returns an iterator of the valid positions in descending order
- pub fn rev_valid_positions_iter(&self) -> impl Iterator<Item = usize> + '_
{
- iter_set_bits_rev(self.nulls().as_slice())
- }
-
- fn nulls(&self) -> &BooleanBufferBuilder {
+ pub fn nulls(&self) -> &BooleanBufferBuilder {
match &self.inner {
BufferInner::Full { nulls, .. } => nulls,
BufferInner::Mask { nulls } => nulls,
@@ -351,39 +346,6 @@ impl PackedDecoder {
}
}
-/// Counts the number of set bits in the provided range
-pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
- let mut count = 0_usize;
- let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
- chunks.iter().for_each(|chunk| {
- count += chunk.count_ones() as usize;
- });
- count += chunks.remainder_bits().count_ones() as usize;
- count
-}
-
-fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
- let (mut byte_idx, mut in_progress) = match bytes.len() {
- 0 => (0, 0),
- len => (len - 1, bytes[len - 1]),
- };
-
- std::iter::from_fn(move || loop {
- if in_progress != 0 {
- let bit_pos = 7 - in_progress.leading_zeros();
- in_progress ^= 1 << bit_pos;
- return Some((byte_idx << 3) + (bit_pos as usize));
- }
-
- if byte_idx == 0 {
- return None;
- }
-
- byte_idx -= 1;
- in_progress = bytes[byte_idx];
- })
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -392,7 +354,7 @@ mod tests {
use crate::basic::Type as PhysicalType;
use crate::encodings::rle::RleEncoder;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
- use rand::{thread_rng, Rng, RngCore};
+ use rand::{thread_rng, Rng};
#[test]
fn test_packed_decoder() {
@@ -428,41 +390,6 @@ mod tests {
}
#[test]
- fn test_bit_fns() {
- let mut rng = thread_rng();
- let mask_length = rng.gen_range(1..20);
- let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 ==
0))
- .take(mask_length)
- .collect();
-
- let mut nulls = BooleanBufferBuilder::new(mask_length);
- bools.iter().for_each(|b| nulls.append(*b));
-
- let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
- let expected: Vec<_> = bools
- .iter()
- .enumerate()
- .rev()
- .filter_map(|(x, y)| y.then(|| x))
- .collect();
- assert_eq!(actual, expected);
-
- assert_eq!(iter_set_bits_rev(&[]).count(), 0);
- assert_eq!(count_set_bits(&[], 0..0), 0);
- assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
-
- for _ in 0..20 {
- let start = rng.gen_range(0..bools.len());
- let end = rng.gen_range(start..bools.len());
-
- let actual = count_set_bits(nulls.as_slice(), start..end);
- let expected = bools[start..end].iter().filter(|x| **x).count();
-
- assert_eq!(actual, expected);
- }
- }
-
- #[test]
fn test_split_off() {
let t = Type::primitive_type_builder("col", PhysicalType::INT32)
.build()
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 87b25b4..1db0ea0 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -567,6 +567,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Returns true if there is enough data for a data page, false otherwise.
#[inline]
fn should_add_data_page(&self) -> bool {
+ // This is necessary in the event of a much larger dictionary size
than page size
+ //
+ // In such a scenario the dictionary decoder may return an estimated
encoded
+ // size in excess of the page size limit, even when there are no
buffered values
+ if self.num_buffered_values == 0 {
+ return false;
+ }
+
match self.dict_encoder {
Some(ref encoder) => {
encoder.estimated_data_encoded_size() >=
self.props.data_pagesize_limit()