This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d11b388a0 Implement Skip for DeltaBitPackDecoder (#2393)
d11b388a0 is described below
commit d11b388a0dfad3cfe0ca30b6a26fff0d88006ca0
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Aug 12 16:26:02 2022 +0800
Implement Skip for DeltaBitPackDecoder (#2393)
* Implement Skip for DeltaBitPackDecoder
* move check out of loop
* add bench
* change to use batch read.
---
parquet/benches/arrow_reader.rs | 53 ++++++++++++++++++++++++++++++++++++
parquet/src/encodings/decoding.rs | 57 +++++++++++++++++++++++++++++++++++++--
2 files changed, 108 insertions(+), 2 deletions(-)
diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index dc2ed8355..a3c904505 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -300,6 +300,26 @@ fn bench_array_reader(mut array_reader: Box<dyn
ArrayReader>) -> usize {
total_count
}
+fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
+ // test procedure: read data in batches of 8192 until no more data
+ let mut total_count = 0;
+ let mut skip = false;
+ let mut array_len;
+ loop {
+ if skip {
+ array_len = array_reader.skip_records(BATCH_SIZE).unwrap();
+ } else {
+ let array = array_reader.next_batch(BATCH_SIZE);
+ array_len = array.unwrap().len();
+ }
+ total_count += array_len;
+ skip = !skip;
+ if array_len < BATCH_SIZE {
+ break;
+ }
+ }
+ total_count
+}
fn create_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
@@ -445,6 +465,39 @@ fn bench_primitive<T>(
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
+ // binary packed skip , no NULLs
+ let data = build_encoded_primitive_page_iterator::<T>(
+ schema.clone(),
+ mandatory_column_desc.clone(),
+ 0.0,
+ Encoding::DELTA_BINARY_PACKED,
+ );
+ group.bench_function("binary packed skip, mandatory, no NULLs", |b| {
+ b.iter(|| {
+ let array_reader = create_primitive_array_reader(
+ data.clone(),
+ mandatory_column_desc.clone(),
+ );
+ count = bench_array_reader_skip(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
+ let data = build_encoded_primitive_page_iterator::<T>(
+ schema.clone(),
+ optional_column_desc.clone(),
+ 0.0,
+ Encoding::DELTA_BINARY_PACKED,
+ );
+ group.bench_function("binary packed skip, optional, no NULLs", |b| {
+ b.iter(|| {
+ let array_reader =
+ create_primitive_array_reader(data.clone(),
optional_column_desc.clone());
+ count = bench_array_reader_skip(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
// binary packed, half NULLs
let data = build_encoded_primitive_page_iterator::<T>(
schema.clone(),
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index bb1e7137a..86941ffe0 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -736,8 +736,61 @@ where
}
fn skip(&mut self, num_values: usize) -> Result<usize> {
- let mut buffer = vec![T::T::default(); num_values];
- self.get(&mut buffer)
+ let mut skip = 0;
+ let to_skip = num_values.min(self.values_left);
+ if to_skip == 0 {
+ return Ok(0);
+ }
+
+ // try to consume first value in header.
+ if let Some(value) = self.first_value.take() {
+ self.last_value = value;
+ skip += 1;
+ self.values_left -= 1;
+ }
+
+ let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
+ Type::INT32 => 32,
+ Type::INT64 => 64,
+ _ => unreachable!(),
+ };
+
+ let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
+ while skip < to_skip {
+ if self.mini_block_remaining == 0 {
+ self.next_mini_block()?;
+ }
+
+ let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as
usize;
+ let mini_block_to_skip = self.mini_block_remaining.min(to_skip -
skip);
+ let mini_block_should_skip = mini_block_to_skip;
+
+ let skip_count = self
+ .bit_reader
+ .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
+
+ if skip_count != mini_block_to_skip {
+ return Err(general_err!(
+ "Expected to skip {} values from mini block got {}.",
+ mini_block_batch_size,
+ skip_count
+ ));
+ }
+
+ for v in &mut skip_buffer[0..skip_count] {
+ *v = v
+ .wrapping_add(&self.min_delta)
+ .wrapping_add(&self.last_value);
+
+ self.last_value = *v;
+ }
+
+ skip += mini_block_should_skip;
+ self.mini_block_remaining -= mini_block_should_skip;
+ self.values_left -= mini_block_should_skip;
+ }
+
+ Ok(to_skip)
}
}