alamb commented on a change in pull request #1284:
URL: https://github.com/apache/arrow-rs/pull/1284#discussion_r805342397



##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
     initialized: bool,
 
     // Header info
-    num_values: usize,
-    num_mini_blocks: i64,
+    // The number of values in each block
+    block_size: usize,
+    /// The number of values in the current page
+    values_left: usize,
+    /// The number of mini-blocks in each block
+    mini_blocks_per_block: usize,
+    /// The number of values in each mini block
     values_per_mini_block: usize,
-    values_current_mini_block: usize,
-    first_value: i64,
-    first_value_read: bool,
 
     // Per block info
-    min_delta: i64,
+    /// The minimum delta in the block
+    min_delta: T::T,
+    /// The byte offset of the end of the current block
+    block_end_offset: usize,
+    /// The index on the current mini block
     mini_block_idx: usize,
-    delta_bit_width: u8,
-    delta_bit_widths: ByteBuffer,
-    deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
-    use_batch: bool,
-
-    current_value: i64,
-
-    _phantom: PhantomData<T>,
+    /// The bit widths of each mini block in the current block
+    mini_block_bit_widths: Vec<u8>,
+    /// The number of values remaining in the current mini block
+    mini_block_remaining: usize,
+
+    /// The first value from the block header if not consumed
+    first_value: Option<T::T>,
+    /// The last value to compute offsets from
+    last_value: T::T,
 }
 
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     /// Creates new delta bit packed decoder.
     pub fn new() -> Self {
         Self {
             bit_reader: BitReader::from(vec![]),
             initialized: false,
-            num_values: 0,
-            num_mini_blocks: 0,
+            block_size: 0,
+            values_left: 0,
+            mini_blocks_per_block: 0,
             values_per_mini_block: 0,
-            values_current_mini_block: 0,
-            first_value: 0,
-            first_value_read: false,
-            min_delta: 0,
+            min_delta: Default::default(),
             mini_block_idx: 0,
-            delta_bit_width: 0,
-            delta_bit_widths: ByteBuffer::new(),
-            deltas_in_mini_block: vec![],
-            use_batch: mem::size_of::<T::T>() == 4,
-            current_value: 0,
-            _phantom: PhantomData,
+            mini_block_bit_widths: vec![],
+            mini_block_remaining: 0,
+            block_end_offset: 0,
+            first_value: None,
+            last_value: Default::default(),
         }
     }
 
-    /// Returns underlying bit reader offset.
+    /// Returns the current offset
     pub fn get_offset(&self) -> usize {
         assert!(self.initialized, "Bit reader is not initialized");
-        self.bit_reader.get_byte_offset()
+        match self.values_left {
+            // If we've exhausted this page report the end of the current block
+            // as we may not have consumed the trailing padding
+            //
+            // The max is necessary to handle pages with no blocks
+            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
+            _ => self.bit_reader.get_byte_offset(),
+        }
     }
 
-    /// Initializes new mini block.
+    /// Initializes the next block and the first mini block within it
     #[inline]
-    fn init_block(&mut self) -> Result<()> {
-        self.min_delta = self
+    fn next_block(&mut self) -> Result<()> {
+        let min_delta = self
             .bit_reader
             .get_zigzag_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
 
-        self.delta_bit_widths.clear();
-        for _ in 0..self.num_mini_blocks {
-            let w = self
-                .bit_reader
-                .get_aligned::<u8>(1)
-                .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?;
-            self.delta_bit_widths.push(w);
+        self.min_delta = T::T::from_i64(min_delta)
+            .ok_or_else(|| general_err!("'min_delta' too large"))?;
+
+        self.mini_block_bit_widths.clear();
+        self.bit_reader.get_aligned_bytes(
+            &mut self.mini_block_bit_widths,
+            self.mini_blocks_per_block as usize,
+        );
+
+        let mut offset = self.bit_reader.get_byte_offset();
+        let mut remaining = self.values_left;
+
+        // Compute the end offset of the current block
+        for b in &mut self.mini_block_bit_widths {
+            if remaining == 0 {
+                // Specification requires handling arbitrary bit widths
+                // for trailing mini blocks
+                *b = 0;
+            }
+            remaining = remaining.saturating_sub(self.values_per_mini_block);
+            offset += *b as usize * self.values_per_mini_block / 8;
+        }
+        self.block_end_offset = offset;
+
+        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
+            return Err(eof_err!("insufficient mini block bit widths"));
         }
 
+        self.mini_block_remaining = self.values_per_mini_block;
         self.mini_block_idx = 0;
-        self.delta_bit_width = self.delta_bit_widths.data()[0];
-        self.values_current_mini_block = self.values_per_mini_block;
+
         Ok(())
     }
 
-    /// Loads delta into mini block.
+    /// Initializes the next mini block
     #[inline]
-    fn load_deltas_in_mini_block(&mut self) -> Result<()>
-    where
-        T::T: FromBytes,
-    {
-        if self.use_batch {
-            self.deltas_in_mini_block
-                .resize(self.values_current_mini_block, T::T::default());
-            let loaded = self.bit_reader.get_batch::<T::T>(
-                &mut self.deltas_in_mini_block[..],
-                self.delta_bit_width as usize,
-            );
-            assert!(loaded == self.values_current_mini_block);
+    fn next_mini_block(&mut self) -> Result<()> {
+        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
+            self.mini_block_idx += 1;
+            self.mini_block_remaining = self.values_per_mini_block;
+            Ok(())
         } else {
-            self.deltas_in_mini_block.clear();
-            for _ in 0..self.values_current_mini_block {
-                // TODO: load one batch at a time similar to int32
-                let delta = self
-                    .bit_reader
-                    .get_value::<T::T>(self.delta_bit_width as usize)
-                    .ok_or_else(|| eof_err!("Not enough data to decode 
'delta'"))?;
-                self.deltas_in_mini_block.push(delta);
-            }
+            self.next_block()
         }
-
-        Ok(())
     }
 }
 
-impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T> {
+impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     // # of total values is derived from encoding
     #[inline]
     fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> {
         self.bit_reader = BitReader::new(data);
         self.initialized = true;
 
-        let block_size = self
+        // Read header information
+        self.block_size = self
             .bit_reader
             .get_vlq_int()
-            .ok_or_else(|| eof_err!("Not enough data to decode 
'block_size'"))?;
-        self.num_mini_blocks = self
+            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
+            .try_into()
+            .map_err(|_| general_err!("invalid 'block_size'"))?;
+
+        self.mini_blocks_per_block = self
             .bit_reader
             .get_vlq_int()
-            .ok_or_else(|| eof_err!("Not enough data to decode 
'num_mini_blocks'"))?;
-        self.num_values = self
+            .ok_or_else(|| eof_err!("Not enough data to decode 
'mini_blocks_per_block'"))?
+            .try_into()
+            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
+
+        self.values_left = self
             .bit_reader
             .get_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 'num_values'"))?
-            as usize;
-        self.first_value = self
+            .try_into()
+            .map_err(|_| general_err!("invalid 'num_values'"))?;
+
+        let first_value = self
             .bit_reader
             .get_zigzag_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 
'first_value'"))?;
 
+        self.first_value = Some(
+            T::T::from_i64(first_value)
+                .ok_or_else(|| general_err!("first value too large"))?,
+        );
+
+        if self.block_size % 128 != 0 {
+            return Err(general_err!(
+                "'block_size' must be a multiple of 128, got {}",
+                self.block_size
+            ));
+        }
+
+        if self.block_size % self.mini_blocks_per_block != 0 {
+            return Err(general_err!(
+                "'block_size' must be a multiple of 'mini_blocks_per_block' 
got {} and {}",
+                self.block_size, self.mini_blocks_per_block
+            ));
+        }
+
         // Reset decoding state
-        self.first_value_read = false;
         self.mini_block_idx = 0;
-        self.delta_bit_widths.clear();
-        self.values_current_mini_block = 0;
+        self.values_per_mini_block = self.block_size / 
self.mini_blocks_per_block;
+        self.mini_block_remaining = 0;
+        self.mini_block_bit_widths.clear();
 
-        self.values_per_mini_block = (block_size / self.num_mini_blocks) as 
usize;
-        assert!(self.values_per_mini_block % 8 == 0);
+        if self.values_per_mini_block % 32 != 0 {

Review comment:
       The real question is if any actual parquet files have this pattern 
(values_per_mini_block be a multiple of 8)

##########
File path: parquet/benches/arrow_reader.rs
##########
@@ -78,16 +89,17 @@ fn build_plain_encoded_int32_page_iterator(
                     max_def_level
                 };
                 if def_level == max_def_level {
-                    int32_value += 1;
-                    values.push(int32_value);
+                    let value =

Review comment:
       this changes the benchmark to use random numbers rather than an 
increasing sequence (`1`, `2`, ...) , right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to