This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 231cf78 Improve parquet reading performance for columns with nulls by
preserving bitmask when possible (#1037) (#1054)
231cf78 is described below
commit 231cf788879f12b3cfbc6da776e2360117567d17
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jan 13 15:18:55 2022 +0000
Improve parquet reading performance for columns with nulls by preserving
bitmask when possible (#1037) (#1054)
* Preserve bitmask (#1037)
* Remove now unnecessary box (#1061)
* Fix handling of empty bitmasks
* More docs
* Add nested nullability test case
* Add packed decoder test
---
arrow/src/array/builder.rs | 5 +
parquet/benches/arrow_array_reader.rs | 9 +-
parquet/src/arrow/array_reader.rs | 92 +++--
parquet/src/arrow/arrow_reader.rs | 55 ++-
parquet/src/arrow/record_reader.rs | 37 +-
parquet/src/arrow/record_reader/buffer.rs | 5 +
.../src/arrow/record_reader/definition_levels.rs | 449 +++++++++++++++++++--
parquet/src/column/reader/decoder.rs | 5 +-
8 files changed, 571 insertions(+), 86 deletions(-)
diff --git a/arrow/src/array/builder.rs b/arrow/src/array/builder.rs
index 1f7e91f..85c013c 100644
--- a/arrow/src/array/builder.rs
+++ b/arrow/src/array/builder.rs
@@ -419,6 +419,11 @@ impl BooleanBufferBuilder {
);
}
+ /// Returns the packed bits
+ pub fn as_slice(&self) -> &[u8] {
+ self.buffer.as_slice()
+ }
+
#[inline]
pub fn finish(&mut self) -> Buffer {
let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0));
diff --git a/parquet/benches/arrow_array_reader.rs
b/parquet/benches/arrow_array_reader.rs
index acc5141..5587b52 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader(
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::PrimitiveArrayReader;
- PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator),
column_desc, None)
- .unwrap()
+ PrimitiveArrayReader::<Int32Type>::new_with_options(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ true,
+ )
+ .unwrap()
}
fn create_string_arrow_array_reader(
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index d166be7..6ba08f9 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -248,6 +248,17 @@ where
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
+ Self::new_with_options(pages, column_desc, arrow_type, false)
+ }
+
+ /// Construct primitive array reader with ability to only compute null
mask and not
+ /// buffer level data
+ pub fn new_with_options(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ arrow_type: Option<ArrowType>,
+ null_mask_only: bool,
+ ) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Some(t) => t,
@@ -256,7 +267,7 @@ where
.clone(),
};
- let record_reader = RecordReader::<T>::new(column_desc.clone());
+ let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
Ok(Self {
data_type,
@@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext
let mut new_context = context.clone();
new_context.path.append(vec![cur_type.name().to_string()]);
- match cur_type.get_basic_info().repetition() {
+ let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
+ false
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
+
+ // Can just compute null mask if no parent
+ context.def_level == 0 && context.rep_level == 0
}
- _ => (),
- }
+ _ => false,
+ };
- let reader =
- self.build_for_primitive_type_inner(cur_type.clone(),
&new_context)?;
+ let reader = self.build_for_primitive_type_inner(
+ cur_type.clone(),
+ &new_context,
+ null_mask_only,
+ )?;
if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(
@@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder {
&self,
cur_type: TypePtr,
context: &'a ArrayReaderBuilderContext,
+ null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
let column_desc = Arc::new(ColumnDescriptor::new(
cur_type.clone(),
@@ -1658,11 +1677,14 @@ impl<'a> ArrayReaderBuilder {
.map(|f| f.data_type().clone());
match cur_type.get_physical_type() {
- PhysicalType::BOOLEAN =>
Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?)),
+ PhysicalType::BOOLEAN => Ok(Box::new(
+ PrimitiveArrayReader::<BoolType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
@@ -1670,18 +1692,24 @@ impl<'a> ArrayReaderBuilder {
column_desc,
)?))
} else {
- Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?))
+ Ok(Box::new(
+ PrimitiveArrayReader::<Int32Type>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ ))
}
}
- PhysicalType::INT64 =>
Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?)),
+ PhysicalType::INT64 => Ok(Box::new(
+ PrimitiveArrayReader::<Int64Type>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
@@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
- PhysicalType::FLOAT =>
Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?)),
- PhysicalType::DOUBLE => {
- Ok(Box::new(PrimitiveArrayReader::<DoubleType>::new(
+ PhysicalType::FLOAT => Ok(Box::new(
+ PrimitiveArrayReader::<FloatType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
- )?))
- }
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::DOUBLE => Ok(Box::new(
+ PrimitiveArrayReader::<DoubleType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() ==
ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index fd09aa9..066f86c 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -249,10 +249,11 @@ mod tests {
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
+ use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
- use crate::util::test_common::{get_temp_filename, RandGen};
+ use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen};
use arrow::array::*;
- use arrow::datatypes::DataType as ArrowDataType;
+ use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::record_batch::RecordBatchReader;
use rand::{thread_rng, RngCore};
use serde_json::json;
@@ -916,4 +917,54 @@ mod tests {
batch.unwrap();
}
}
+
+ #[test]
+ fn test_nested_nullability() {
+ let message_type = "message nested {
+ OPTIONAL Group group {
+ REQUIRED INT32 leaf;
+ }
+ }";
+
+ let file = get_temp_file("nested_nullability.parquet", &[]);
+ let schema = Arc::new(parse_message_type(message_type).unwrap());
+
+ {
+ // Write using low-level parquet API (#1167)
+ let writer_props = Arc::new(WriterProperties::builder().build());
+ let mut writer = SerializedFileWriter::new(
+ file.try_clone().unwrap(),
+ schema,
+ writer_props,
+ )
+ .unwrap();
+
+ let mut row_group_writer = writer.next_row_group().unwrap();
+ let mut column_writer =
row_group_writer.next_column().unwrap().unwrap();
+
+ get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
+ .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
+ .unwrap();
+
+ row_group_writer.close_column(column_writer).unwrap();
+ writer.close_row_group(row_group_writer).unwrap();
+
+ writer.close().unwrap();
+ }
+
+ let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
+ let mut batch = ParquetFileArrowReader::new(file_reader);
+ let reader = batch.get_record_reader_by_columns(vec![0],
1024).unwrap();
+
+ let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
+ "group",
+ ArrowDataType::Struct(vec![Field::new("leaf",
ArrowDataType::Int32, false)]),
+ true,
+ )]);
+
+ let batch = reader.into_iter().next().unwrap().unwrap();
+ assert_eq!(batch.schema().as_ref(), &expected_schema);
+ assert_eq!(batch.num_rows(), 4);
+ assert_eq!(batch.column(0).data().null_count(), 2);
+ }
}
diff --git a/parquet/src/arrow/record_reader.rs
b/parquet/src/arrow/record_reader.rs
index 4913e14..df93ebf 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -73,9 +73,35 @@ where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
+ /// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
- let def_levels =
- (desc.max_def_level() > 0).then(||
DefinitionLevelBuffer::new(&desc));
+ Self::new_with_options(desc, false)
+ }
+
+ /// Create a new [`GenericRecordReader`] with the ability to only generate
the bitmask
+ ///
+ /// If `null_mask_only` is true only the null bitmask will be generated and
+ /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will
always return `None`
+ ///
+ /// It is insufficient to solely check that that the max definition level
is 1 as we
+ /// need there to be no nullable parent array that will required decoded
definition levels
+ ///
+ /// In particular consider the case of:
+ ///
+ /// ```ignore
+ /// message nested {
+ /// OPTIONAL Group group {
+ /// REQUIRED INT32 leaf;
+ /// }
+ /// }
+ /// ```
+ ///
+ /// The maximum definition level of leaf is 1, however, we still need to
decode the
+ /// definition levels so that the parent group can be constructed correctly
+ ///
+ pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool)
-> Self {
+ let def_levels = (desc.max_def_level() > 0)
+ .then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));
let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
@@ -171,7 +197,7 @@ where
/// as record values, e.g. those from `self.num_values` to
`self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
- Some(x) => Some(x.split_off(self.num_values)),
+ Some(x) => x.split_levels(self.num_values),
None => None,
})
}
@@ -221,10 +247,7 @@ where
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));
- let def_levels = self
- .def_levels
- .as_mut()
- .map(|levels| levels.spare_capacity_mut(batch_size));
+ let def_levels = self.def_levels.as_mut();
let values = self.records.spare_capacity_mut(batch_size);
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 7dbf2d1..29e6110 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -114,6 +114,11 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}
+ pub fn resize(&mut self, len: usize) {
+ self.buffer.resize(len * std::mem::size_of::<T>(), 0);
+ self.len = len;
+ }
+
#[inline]
pub fn as_slice(&self) -> &[T] {
let (prefix, buf, suffix) = unsafe {
self.buffer.as_slice().align_to::<T>() };
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 86c089f..c38505a 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -15,74 +15,114 @@
// specific language governing permissions and limitations
// under the License.
+use std::ops::Range;
+
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
-use std::ops::Range;
+use arrow::util::bit_chunk_iterator::BitChunks;
-use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::arrow::record_reader::buffer::BufferQueue;
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+ ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+};
+use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
-use super::{
- buffer::{BufferQueue, ScalarBuffer},
- MIN_BATCH_SIZE,
-};
+use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+
+enum BufferInner {
+ /// Compute levels and null mask
+ Full {
+ levels: ScalarBuffer<i16>,
+ nulls: BooleanBufferBuilder,
+ max_level: i16,
+ },
+ /// Only compute null bitmask - requires max level to be 1
+ ///
+ /// This is an optimisation for the common case of a nullable scalar
column, as decoding
+ /// the definition level data is only required when decoding nested
structures
+ ///
+ Mask { nulls: BooleanBufferBuilder },
+}
pub struct DefinitionLevelBuffer {
- buffer: ScalarBuffer<i16>,
- builder: BooleanBufferBuilder,
- max_level: i16,
+ inner: BufferInner,
+
+ /// The length of this buffer
+ ///
+ /// Note: `buffer` and `builder` may contain more elements
+ len: usize,
}
-impl BufferQueue for DefinitionLevelBuffer {
- type Output = Buffer;
- type Slice = [i16];
+impl DefinitionLevelBuffer {
+ pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+ let inner = match null_mask_only {
+ true => {
+ assert_eq!(
+ desc.max_def_level(),
+ 1,
+ "max definition level must be 1 to only compute null
bitmask"
+ );
- fn split_off(&mut self, len: usize) -> Self::Output {
- self.buffer.split_off(len)
- }
+ assert_eq!(
+ desc.max_rep_level(),
+ 0,
+ "max repetition level must be 0 to only compute null
bitmask"
+ );
- fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
- assert_eq!(self.buffer.len(), self.builder.len());
- self.buffer.spare_capacity_mut(batch_size)
- }
+ BufferInner::Mask {
+ nulls: BooleanBufferBuilder::new(0),
+ }
+ }
+ false => BufferInner::Full {
+ levels: ScalarBuffer::new(),
+ nulls: BooleanBufferBuilder::new(0),
+ max_level: desc.max_def_level(),
+ },
+ };
- fn set_len(&mut self, len: usize) {
- self.buffer.set_len(len);
- let buf = self.buffer.as_slice();
+ Self { inner, len: 0 }
+ }
- let range = self.builder.len()..len;
- self.builder.reserve(range.end - range.start);
- for i in &buf[range] {
- self.builder.append(*i == self.max_level)
+ pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+ match &mut self.inner {
+ BufferInner::Full { levels, .. } => {
+ let out = levels.split_off(len);
+ self.len = levels.len();
+ Some(out)
+ }
+ BufferInner::Mask { .. } => None,
}
}
-}
-impl DefinitionLevelBuffer {
- pub fn new(desc: &ColumnDescPtr) -> Self {
- Self {
- buffer: ScalarBuffer::new(),
- builder: BooleanBufferBuilder::new(0),
- max_level: desc.max_def_level(),
- }
+ pub fn set_len(&mut self, len: usize) {
+ assert_eq!(self.nulls().len(), len);
+ self.len = len;
}
/// Split `len` levels out of `self`
pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
- let old_len = self.builder.len();
+ let builder = match &mut self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ };
+
+ let old_len = builder.len();
let num_left_values = old_len - len;
let new_bitmap_builder =
BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
- let old_bitmap =
- std::mem::replace(&mut self.builder, new_bitmap_builder).finish();
+ let old_bitmap = std::mem::replace(builder,
new_bitmap_builder).finish();
let old_bitmap = Bitmap::from(old_bitmap);
for i in len..old_len {
- self.builder.append(old_bitmap.is_set(i));
+ builder.append(old_bitmap.is_set(i));
}
+ self.len = builder.len();
old_bitmap
}
@@ -91,10 +131,335 @@ impl DefinitionLevelBuffer {
&self,
range: Range<usize>,
) -> impl Iterator<Item = usize> + '_ {
- let max_def_level = self.max_level;
- let slice = self.buffer.as_slice();
- range.rev().filter(move |x| slice[*x] == max_def_level)
+ assert_eq!(range.start, self.len);
+ iter_set_bits_rev(self.nulls().as_slice())
+ }
+
+ fn nulls(&self) -> &BooleanBufferBuilder {
+ match &self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ }
+ }
+}
+
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+
+ fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+ let total_count = range.end - range.start;
+ let range = range.start + self.len..range.end + self.len;
+ total_count - count_set_bits(self.nulls().as_slice(), range)
+ }
+}
+
+pub struct DefinitionLevelDecoder {
+ max_level: i16,
+ encoding: Encoding,
+ data: Option<ByteBufferPtr>,
+ column_decoder: Option<ColumnLevelDecoderImpl>,
+ packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+ type Slice = DefinitionLevelBuffer;
+
+ fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+ Self {
+ max_level,
+ encoding,
+ data: Some(data),
+ column_decoder: None,
+ packed_decoder: None,
+ }
+ }
+
+ fn read(
+ &mut self,
+ writer: &mut Self::Slice,
+ range: Range<usize>,
+ ) -> crate::errors::Result<usize> {
+ match &mut writer.inner {
+ BufferInner::Full {
+ levels,
+ nulls,
+ max_level,
+ } => {
+ assert_eq!(self.max_level, *max_level);
+ assert_eq!(range.start + writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => self.column_decoder.insert(
+ ColumnLevelDecoderImpl::new(self.max_level,
self.encoding, data),
+ ),
+ None => self
+ .column_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ levels.resize(range.end + writer.len);
+
+ let slice = &mut levels.as_slice_mut()[writer.len..];
+ let levels_read = decoder.read(slice, range.clone())?;
+
+ nulls.reserve(levels_read);
+ for i in &slice[range.start..range.start + levels_read] {
+ nulls.append(i == max_level)
+ }
+
+ Ok(levels_read)
+ }
+ BufferInner::Mask { nulls } => {
+ assert_eq!(self.max_level, 1);
+ assert_eq!(range.start + writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => self
+ .packed_decoder
+ .insert(PackedDecoder::new(self.encoding, data)),
+ None => self
+ .packed_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ decoder.read(nulls, range.end - range.start)
+ }
+ }
+ }
+}
+
+/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit
width of 1
+/// directly into a bitmask
+///
+/// This is significantly faster than decoding the data into `[i16]` and then
computing
+/// a bitmask from this, as not only can it skip this buffer allocation and
construction,
+/// but it can exploit properties of the encoded data to reduce work further
+///
+/// In particular:
+///
+/// * Packed runs are already bitmask encoded and can simply be appended
+/// * Runs of 1 or 0 bits can be efficiently appended with byte (or larger)
operations
+///
+/// [RLE]:
https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
+/// [BIT_PACKED]:
https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
+struct PackedDecoder {
+ data: ByteBufferPtr,
+ data_offset: usize,
+ rle_left: usize,
+ rle_value: bool,
+ packed_count: usize,
+ packed_offset: usize,
+}
+
+impl PackedDecoder {
+ fn next_rle_block(&mut self) -> Result<()> {
+ let indicator_value = self.decode_header()?;
+ if indicator_value & 1 == 1 {
+ let len = (indicator_value >> 1) as usize;
+ self.packed_count = len * 8;
+ self.packed_offset = 0;
+ } else {
+ self.rle_left = (indicator_value >> 1) as usize;
+ let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(||
{
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition levels
rle value"
+ .into(),
+ )
+ })?;
+
+ self.data_offset += 1;
+ self.rle_value = byte != 0;
+ }
+ Ok(())
+ }
+
+ /// Decodes a VLQ encoded little endian integer and returns it
+ fn decode_header(&mut self) -> Result<i64> {
+ let mut offset = 0;
+ let mut v: i64 = 0;
+ while offset < 10 {
+ let byte = *self
+ .data
+ .as_ref()
+ .get(self.data_offset + offset)
+ .ok_or_else(|| {
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition
levels rle header"
+ .into(),
+ )
+ })?;
+
+ v |= ((byte & 0x7F) as i64) << (offset * 7);
+ offset += 1;
+ if byte & 0x80 == 0 {
+ self.data_offset += offset;
+ return Ok(v);
+ }
+ }
+ Err(general_err!("too many bytes for VLQ"))
+ }
+}
+
+impl PackedDecoder {
+ fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+ match encoding {
+ Encoding::RLE => Self {
+ data,
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: 0,
+ packed_offset: 0,
+ },
+ Encoding::BIT_PACKED => Self {
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: data.len() * 8,
+ packed_offset: 0,
+ data,
+ },
+ _ => unreachable!("invalid level encoding: {}", encoding),
+ }
+ }
+
+ fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) ->
Result<usize> {
+ let mut read = 0;
+ while read != len {
+ if self.rle_left != 0 {
+ let to_read = self.rle_left.min(len - read);
+ buffer.append_n(to_read, self.rle_value);
+ self.rle_left -= to_read;
+ read += to_read;
+ } else if self.packed_count != self.packed_offset {
+ let to_read = (self.packed_count - self.packed_offset).min(len
- read);
+ let offset = self.data_offset * 8 + self.packed_offset;
+ buffer.append_packed_range(offset..offset + to_read,
self.data.as_ref());
+ self.packed_offset += to_read;
+ read += to_read;
+
+ if self.packed_offset == self.packed_count {
+ self.data_offset += self.packed_count / 8;
+ }
+ } else if self.data_offset == self.data.len() {
+ break;
+ } else {
+ self.next_rle_block()?
+ }
+ }
+ Ok(read)
}
}
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+/// Counts the number of set bits in the provided range
+pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
+ let mut count = 0_usize;
+ let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
+ chunks.iter().for_each(|chunk| {
+ count += chunk.count_ones() as usize;
+ });
+ count += chunks.remainder_bits().count_ones() as usize;
+ count
+}
+
+fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
+ let (mut byte_idx, mut in_progress) = match bytes.len() {
+ 0 => (0, 0),
+ len => (len - 1, bytes[len - 1]),
+ };
+
+ std::iter::from_fn(move || loop {
+ if in_progress != 0 {
+ let bit_pos = 7 - in_progress.leading_zeros();
+ in_progress ^= 1 << bit_pos;
+ return Some((byte_idx << 3) + (bit_pos as usize));
+ }
+
+ if byte_idx == 0 {
+ return None;
+ }
+
+ byte_idx -= 1;
+ in_progress = bytes[byte_idx];
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::encodings::rle::RleEncoder;
+ use rand::{thread_rng, Rng, RngCore};
+
+ #[test]
+ fn test_packed_decoder() {
+ let mut rng = thread_rng();
+ let len: usize = rng.gen_range(512..1024);
+
+ let mut expected = BooleanBufferBuilder::new(len);
+ let mut encoder = RleEncoder::new(1, 1024);
+ for _ in 0..len {
+ let bool = rng.gen_bool(0.8);
+ assert!(encoder.put(bool as u64).unwrap());
+ expected.append(bool);
+ }
+ assert_eq!(expected.len(), len);
+
+ let encoded = encoder.consume().unwrap();
+ let mut decoder = PackedDecoder::new(Encoding::RLE,
ByteBufferPtr::new(encoded));
+
+ // Decode data in random length intervals
+ let mut decoded = BooleanBufferBuilder::new(len);
+ loop {
+ let remaining = len - decoded.len();
+ if remaining == 0 {
+ break;
+ }
+
+ let to_read = rng.gen_range(1..=remaining);
+ decoder.read(&mut decoded, to_read).unwrap();
+ }
+
+ assert_eq!(decoded.len(), len);
+ assert_eq!(decoded.as_slice(), expected.as_slice());
+ }
+
+ #[test]
+ fn test_bit_fns() {
+ let mut rng = thread_rng();
+ let mask_length = rng.gen_range(1..20);
+ let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 ==
0))
+ .take(mask_length)
+ .collect();
+
+ let mut nulls = BooleanBufferBuilder::new(mask_length);
+ bools.iter().for_each(|b| nulls.append(*b));
+
+ let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
+ let expected: Vec<_> = bools
+ .iter()
+ .enumerate()
+ .rev()
+ .filter_map(|(x, y)| y.then(|| x))
+ .collect();
+ assert_eq!(actual, expected);
+
+ assert_eq!(iter_set_bits_rev(&[]).count(), 0);
+ assert_eq!(count_set_bits(&[], 0..0), 0);
+ assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
+
+ for _ in 0..20 {
+ let start = rng.gen_range(0..bools.len());
+ let end = rng.gen_range(start..bools.len());
+
+ let actual = count_set_bits(nulls.as_slice(), start..end);
+ let expected = bools[start..end].iter().filter(|x| **x).count();
+
+ assert_eq!(actual, expected);
+ }
+ }
+}
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index 52caba2..a9221d9 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -219,8 +219,7 @@ pub struct ColumnLevelDecoderImpl {
enum LevelDecoderInner {
Packed(BitReader, u8),
- /// Boxed as `RleDecoder` contains an inline buffer
- Rle(Box<RleDecoder>),
+ Rle(RleDecoder),
}
impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
@@ -230,7 +229,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as
u8;
match encoding {
Encoding::RLE => {
- let mut decoder = Box::new(RleDecoder::new(bit_width));
+ let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data);
Self {
inner: LevelDecoderInner::Rle(decoder),