sunchao commented on a change in pull request #1284:
URL: https://github.com/apache/arrow-rs/pull/1284#discussion_r802189981
##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
initialized: bool,
// Header info
- num_values: usize,
- num_mini_blocks: i64,
+ // The number of values in each block
+ block_size: usize,
+ /// The number of values in the current page
Review comment:
nit: maybe "The number of values remain to be read in the current page"
##########
File path: parquet/src/util/bit_util.rs
##########
@@ -541,6 +547,17 @@ impl BitReader {
let mut i = 0;
+ if num_bits > 32 {
Review comment:
We can probably add `unpack64` similar to [Arrow
C++](https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/bpacking.h#L31).
It also has SIMD acceleration.
##########
File path: parquet/benches/arrow_reader.rs
##########
@@ -419,6 +419,62 @@ fn add_benches(c: &mut Criterion) {
},
);
+ // int32, binary packed, no NULLs
Review comment:
maybe it's worth adding benchmark for int64 too.
##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
initialized: bool,
// Header info
- num_values: usize,
- num_mini_blocks: i64,
+ // The number of values in each block
+ block_size: usize,
+ /// The number of values in the current page
+ values_left: usize,
+ /// The number of mini-blocks in each block
+ mini_blocks_per_block: usize,
+ /// The number of values in each mini block
values_per_mini_block: usize,
- values_current_mini_block: usize,
- first_value: i64,
- first_value_read: bool,
// Per block info
- min_delta: i64,
+ /// The minimum delta in the block
+ min_delta: T::T,
+ /// The byte offset of the end of the current block
+ block_end_offset: usize,
+ /// The index on the current mini block
mini_block_idx: usize,
- delta_bit_width: u8,
- delta_bit_widths: ByteBuffer,
- deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
- use_batch: bool,
-
- current_value: i64,
-
- _phantom: PhantomData<T>,
+ /// The bit widths of each mini block in the current block
+ mini_block_bit_widths: Vec<u8>,
+ /// The number of values remaining in the current mini block
+ mini_block_remaining: usize,
+
+ /// The first value from the block header if not consumed
+ first_value: Option<T::T>,
+ /// The last value to compute offsets from
+ last_value: T::T,
}
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+ T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
/// Creates new delta bit packed decoder.
pub fn new() -> Self {
Self {
bit_reader: BitReader::from(vec![]),
initialized: false,
- num_values: 0,
- num_mini_blocks: 0,
+ block_size: 0,
+ values_left: 0,
+ mini_blocks_per_block: 0,
values_per_mini_block: 0,
- values_current_mini_block: 0,
- first_value: 0,
- first_value_read: false,
- min_delta: 0,
+ min_delta: Default::default(),
mini_block_idx: 0,
- delta_bit_width: 0,
- delta_bit_widths: ByteBuffer::new(),
- deltas_in_mini_block: vec![],
- use_batch: mem::size_of::<T::T>() == 4,
- current_value: 0,
- _phantom: PhantomData,
+ mini_block_bit_widths: vec![],
+ mini_block_remaining: 0,
+ block_end_offset: 0,
+ first_value: None,
+ last_value: Default::default(),
}
}
- /// Returns underlying bit reader offset.
+ /// Returns the current offset
pub fn get_offset(&self) -> usize {
assert!(self.initialized, "Bit reader is not initialized");
- self.bit_reader.get_byte_offset()
+ match self.values_left {
+ // If we've exhausted this page report the end of the current block
+ // as we may not have consumed the trailing padding
+ //
+ // The max is necessary to handle pages with no blocks
+ 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
+ _ => self.bit_reader.get_byte_offset(),
+ }
}
- /// Initializes new mini block.
+ /// Initializes the next block and the first mini block within it
#[inline]
- fn init_block(&mut self) -> Result<()> {
- self.min_delta = self
+ fn next_block(&mut self) -> Result<()> {
+ let min_delta = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
- self.delta_bit_widths.clear();
- for _ in 0..self.num_mini_blocks {
- let w = self
- .bit_reader
- .get_aligned::<u8>(1)
- .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?;
- self.delta_bit_widths.push(w);
+ self.min_delta = T::T::from_i64(min_delta)
+ .ok_or_else(|| general_err!("'min_delta' too large"))?;
+
+ self.mini_block_bit_widths.clear();
+ self.bit_reader.get_aligned_bytes(
+ &mut self.mini_block_bit_widths,
+ self.mini_blocks_per_block as usize,
+ );
+
+ let mut offset = self.bit_reader.get_byte_offset();
+ let mut remaining = self.values_left;
+
+ // Compute the end offset of the current block
+ for b in &mut self.mini_block_bit_widths {
+ if remaining == 0 {
+ // Specification requires handling arbitrary bit widths
+ // for trailing mini blocks
+ *b = 0;
+ }
+ remaining = remaining.saturating_sub(self.values_per_mini_block);
+ offset += *b as usize * self.values_per_mini_block / 8;
+ }
+ self.block_end_offset = offset;
+
+ if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
+ return Err(eof_err!("insufficient mini block bit widths"));
}
+ self.mini_block_remaining = self.values_per_mini_block;
self.mini_block_idx = 0;
- self.delta_bit_width = self.delta_bit_widths.data()[0];
- self.values_current_mini_block = self.values_per_mini_block;
+
Ok(())
}
- /// Loads delta into mini block.
+ /// Initializes the next mini block
#[inline]
- fn load_deltas_in_mini_block(&mut self) -> Result<()>
- where
- T::T: FromBytes,
- {
- if self.use_batch {
- self.deltas_in_mini_block
- .resize(self.values_current_mini_block, T::T::default());
- let loaded = self.bit_reader.get_batch::<T::T>(
- &mut self.deltas_in_mini_block[..],
- self.delta_bit_width as usize,
- );
- assert!(loaded == self.values_current_mini_block);
+ fn next_mini_block(&mut self) -> Result<()> {
+ if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
+ self.mini_block_idx += 1;
+ self.mini_block_remaining = self.values_per_mini_block;
+ Ok(())
} else {
- self.deltas_in_mini_block.clear();
- for _ in 0..self.values_current_mini_block {
- // TODO: load one batch at a time similar to int32
- let delta = self
- .bit_reader
- .get_value::<T::T>(self.delta_bit_width as usize)
- .ok_or_else(|| eof_err!("Not enough data to decode
'delta'"))?;
- self.deltas_in_mini_block.push(delta);
- }
+ self.next_block()
}
-
- Ok(())
}
}
-impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T> {
+impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
+where
+ T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
// # of total values is derived from encoding
#[inline]
fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> {
self.bit_reader = BitReader::new(data);
self.initialized = true;
- let block_size = self
+ // Read header information
+ self.block_size = self
.bit_reader
.get_vlq_int()
- .ok_or_else(|| eof_err!("Not enough data to decode
'block_size'"))?;
- self.num_mini_blocks = self
+ .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
+ .try_into()
+ .map_err(|_| general_err!("invalid 'block_size'"))?;
+
+ self.mini_blocks_per_block = self
.bit_reader
.get_vlq_int()
- .ok_or_else(|| eof_err!("Not enough data to decode
'num_mini_blocks'"))?;
- self.num_values = self
+ .ok_or_else(|| eof_err!("Not enough data to decode
'mini_blocks_per_block'"))?
+ .try_into()
+ .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
+
+ self.values_left = self
.bit_reader
.get_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'num_values'"))?
- as usize;
- self.first_value = self
+ .try_into()
+ .map_err(|_| general_err!("invalid 'num_values'"))?;
Review comment:
nit: maybe we should update `num_values` to `values_left` here too.
##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
initialized: bool,
// Header info
- num_values: usize,
- num_mini_blocks: i64,
+ // The number of values in each block
Review comment:
nit: `//` -> `///`
##########
File path: parquet/src/util/bit_util.rs
##########
@@ -602,6 +619,26 @@ impl BitReader {
values_to_read
}
+ /// Reads up to `num_bytes` to `buffer` returning the number of bytes read
Review comment:
nit: `buffer` -> `buf`
##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
initialized: bool,
// Header info
- num_values: usize,
- num_mini_blocks: i64,
+ // The number of values in each block
+ block_size: usize,
+ /// The number of values in the current page
+ values_left: usize,
+ /// The number of mini-blocks in each block
+ mini_blocks_per_block: usize,
+ /// The number of values in each mini block
values_per_mini_block: usize,
- values_current_mini_block: usize,
- first_value: i64,
- first_value_read: bool,
// Per block info
- min_delta: i64,
+ /// The minimum delta in the block
+ min_delta: T::T,
+ /// The byte offset of the end of the current block
+ block_end_offset: usize,
+ /// The index on the current mini block
mini_block_idx: usize,
- delta_bit_width: u8,
- delta_bit_widths: ByteBuffer,
- deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
- use_batch: bool,
-
- current_value: i64,
-
- _phantom: PhantomData<T>,
+ /// The bit widths of each mini block in the current block
+ mini_block_bit_widths: Vec<u8>,
+ /// The number of values remaining in the current mini block
+ mini_block_remaining: usize,
+
+ /// The first value from the block header if not consumed
+ first_value: Option<T::T>,
+ /// The last value to compute offsets from
+ last_value: T::T,
}
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+ T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
/// Creates new delta bit packed decoder.
pub fn new() -> Self {
Self {
bit_reader: BitReader::from(vec![]),
initialized: false,
- num_values: 0,
- num_mini_blocks: 0,
+ block_size: 0,
+ values_left: 0,
+ mini_blocks_per_block: 0,
values_per_mini_block: 0,
- values_current_mini_block: 0,
- first_value: 0,
- first_value_read: false,
- min_delta: 0,
+ min_delta: Default::default(),
mini_block_idx: 0,
- delta_bit_width: 0,
- delta_bit_widths: ByteBuffer::new(),
- deltas_in_mini_block: vec![],
- use_batch: mem::size_of::<T::T>() == 4,
- current_value: 0,
- _phantom: PhantomData,
+ mini_block_bit_widths: vec![],
+ mini_block_remaining: 0,
+ block_end_offset: 0,
+ first_value: None,
+ last_value: Default::default(),
}
}
- /// Returns underlying bit reader offset.
+ /// Returns the current offset
pub fn get_offset(&self) -> usize {
assert!(self.initialized, "Bit reader is not initialized");
- self.bit_reader.get_byte_offset()
+ match self.values_left {
+ // If we've exhausted this page report the end of the current block
+ // as we may not have consumed the trailing padding
+ //
+ // The max is necessary to handle pages with no blocks
+ 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
+ _ => self.bit_reader.get_byte_offset(),
+ }
}
- /// Initializes new mini block.
+ /// Initializes the next block and the first mini block within it
#[inline]
- fn init_block(&mut self) -> Result<()> {
- self.min_delta = self
+ fn next_block(&mut self) -> Result<()> {
+ let min_delta = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
- self.delta_bit_widths.clear();
- for _ in 0..self.num_mini_blocks {
- let w = self
- .bit_reader
- .get_aligned::<u8>(1)
- .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?;
- self.delta_bit_widths.push(w);
+ self.min_delta = T::T::from_i64(min_delta)
+ .ok_or_else(|| general_err!("'min_delta' too large"))?;
+
+ self.mini_block_bit_widths.clear();
+ self.bit_reader.get_aligned_bytes(
+ &mut self.mini_block_bit_widths,
+ self.mini_blocks_per_block as usize,
+ );
+
+ let mut offset = self.bit_reader.get_byte_offset();
+ let mut remaining = self.values_left;
+
+ // Compute the end offset of the current block
+ for b in &mut self.mini_block_bit_widths {
+ if remaining == 0 {
+ // Specification requires handling arbitrary bit widths
+ // for trailing mini blocks
+ *b = 0;
+ }
+ remaining = remaining.saturating_sub(self.values_per_mini_block);
+ offset += *b as usize * self.values_per_mini_block / 8;
+ }
+ self.block_end_offset = offset;
+
+ if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
+ return Err(eof_err!("insufficient mini block bit widths"));
}
+ self.mini_block_remaining = self.values_per_mini_block;
self.mini_block_idx = 0;
- self.delta_bit_width = self.delta_bit_widths.data()[0];
- self.values_current_mini_block = self.values_per_mini_block;
+
Ok(())
}
- /// Loads delta into mini block.
+ /// Initializes the next mini block
#[inline]
- fn load_deltas_in_mini_block(&mut self) -> Result<()>
- where
- T::T: FromBytes,
- {
- if self.use_batch {
- self.deltas_in_mini_block
- .resize(self.values_current_mini_block, T::T::default());
- let loaded = self.bit_reader.get_batch::<T::T>(
- &mut self.deltas_in_mini_block[..],
- self.delta_bit_width as usize,
- );
- assert!(loaded == self.values_current_mini_block);
+ fn next_mini_block(&mut self) -> Result<()> {
+ if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
+ self.mini_block_idx += 1;
+ self.mini_block_remaining = self.values_per_mini_block;
+ Ok(())
} else {
- self.deltas_in_mini_block.clear();
- for _ in 0..self.values_current_mini_block {
- // TODO: load one batch at a time similar to int32
- let delta = self
- .bit_reader
- .get_value::<T::T>(self.delta_bit_width as usize)
- .ok_or_else(|| eof_err!("Not enough data to decode
'delta'"))?;
- self.deltas_in_mini_block.push(delta);
- }
+ self.next_block()
}
-
- Ok(())
}
}
-impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T> {
+impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
+where
+ T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
// # of total values is derived from encoding
#[inline]
fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> {
self.bit_reader = BitReader::new(data);
self.initialized = true;
- let block_size = self
+ // Read header information
+ self.block_size = self
.bit_reader
.get_vlq_int()
- .ok_or_else(|| eof_err!("Not enough data to decode
'block_size'"))?;
- self.num_mini_blocks = self
+ .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
+ .try_into()
+ .map_err(|_| general_err!("invalid 'block_size'"))?;
+
+ self.mini_blocks_per_block = self
.bit_reader
.get_vlq_int()
- .ok_or_else(|| eof_err!("Not enough data to decode
'num_mini_blocks'"))?;
- self.num_values = self
+ .ok_or_else(|| eof_err!("Not enough data to decode
'mini_blocks_per_block'"))?
+ .try_into()
+ .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
+
+ self.values_left = self
.bit_reader
.get_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'num_values'"))?
- as usize;
- self.first_value = self
+ .try_into()
+ .map_err(|_| general_err!("invalid 'num_values'"))?;
+
+ let first_value = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode
'first_value'"))?;
+ self.first_value = Some(
+ T::T::from_i64(first_value)
+ .ok_or_else(|| general_err!("first value too large"))?,
+ );
+
+ if self.block_size % 128 != 0 {
+ return Err(general_err!(
+ "'block_size' must be a multiple of 128, got {}",
+ self.block_size
+ ));
+ }
+
+ if self.block_size % self.mini_blocks_per_block != 0 {
+ return Err(general_err!(
+ "'block_size' must be a multiple of 'mini_blocks_per_block'
got {} and {}",
+ self.block_size, self.mini_blocks_per_block
+ ));
+ }
+
// Reset decoding state
- self.first_value_read = false;
self.mini_block_idx = 0;
- self.delta_bit_widths.clear();
- self.values_current_mini_block = 0;
+ self.values_per_mini_block = self.block_size /
self.mini_blocks_per_block;
+ self.mini_block_remaining = 0;
+ self.mini_block_bit_widths.clear();
- self.values_per_mini_block = (block_size / self.num_mini_blocks) as
usize;
- assert!(self.values_per_mini_block % 8 == 0);
+ if self.values_per_mini_block % 32 != 0 {
Review comment:
In theory, `parquet-mr` allows `values_per_mini_block` to be multiple of
8: see https://issues.apache.org/jira/browse/PARQUET-2077, although I think
it's very unlikely to happen.
##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
initialized: bool,
// Header info
- num_values: usize,
- num_mini_blocks: i64,
+ // The number of values in each block
+ block_size: usize,
+ /// The number of values in the current page
+ values_left: usize,
+ /// The number of mini-blocks in each block
+ mini_blocks_per_block: usize,
+ /// The number of values in each mini block
values_per_mini_block: usize,
- values_current_mini_block: usize,
- first_value: i64,
- first_value_read: bool,
// Per block info
- min_delta: i64,
+ /// The minimum delta in the block
+ min_delta: T::T,
+ /// The byte offset of the end of the current block
+ block_end_offset: usize,
+ /// The index on the current mini block
mini_block_idx: usize,
- delta_bit_width: u8,
- delta_bit_widths: ByteBuffer,
- deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
- use_batch: bool,
-
- current_value: i64,
-
- _phantom: PhantomData<T>,
+ /// The bit widths of each mini block in the current block
+ mini_block_bit_widths: Vec<u8>,
+ /// The number of values remaining in the current mini block
+ mini_block_remaining: usize,
+
+ /// The first value from the block header if not consumed
+ first_value: Option<T::T>,
+ /// The last value to compute offsets from
+ last_value: T::T,
}
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+ T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
/// Creates new delta bit packed decoder.
pub fn new() -> Self {
Self {
bit_reader: BitReader::from(vec![]),
initialized: false,
- num_values: 0,
- num_mini_blocks: 0,
+ block_size: 0,
+ values_left: 0,
+ mini_blocks_per_block: 0,
values_per_mini_block: 0,
- values_current_mini_block: 0,
- first_value: 0,
- first_value_read: false,
- min_delta: 0,
+ min_delta: Default::default(),
mini_block_idx: 0,
- delta_bit_width: 0,
- delta_bit_widths: ByteBuffer::new(),
- deltas_in_mini_block: vec![],
- use_batch: mem::size_of::<T::T>() == 4,
- current_value: 0,
- _phantom: PhantomData,
+ mini_block_bit_widths: vec![],
+ mini_block_remaining: 0,
+ block_end_offset: 0,
+ first_value: None,
+ last_value: Default::default(),
}
}
- /// Returns underlying bit reader offset.
+ /// Returns the current offset
pub fn get_offset(&self) -> usize {
assert!(self.initialized, "Bit reader is not initialized");
- self.bit_reader.get_byte_offset()
+ match self.values_left {
+ // If we've exhausted this page report the end of the current block
+ // as we may not have consumed the trailing padding
+ //
+ // The max is necessary to handle pages with no blocks
Review comment:
Hmm when this will happen? in this case the header is still present?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]