tustvold commented on a change in pull request #1284:
URL: https://github.com/apache/arrow-rs/pull/1284#discussion_r805158878



##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
     initialized: bool,
 
     // Header info
-    num_values: usize,
-    num_mini_blocks: i64,
+    // The number of values in each block
+    block_size: usize,
+    /// The number of values in the current page
+    values_left: usize,
+    /// The number of mini-blocks in each block
+    mini_blocks_per_block: usize,
+    /// The number of values in each mini block
     values_per_mini_block: usize,
-    values_current_mini_block: usize,
-    first_value: i64,
-    first_value_read: bool,
 
     // Per block info
-    min_delta: i64,
+    /// The minimum delta in the block
+    min_delta: T::T,
+    /// The byte offset of the end of the current block
+    block_end_offset: usize,
+    /// The index on the current mini block
     mini_block_idx: usize,
-    delta_bit_width: u8,
-    delta_bit_widths: ByteBuffer,
-    deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
-    use_batch: bool,
-
-    current_value: i64,
-
-    _phantom: PhantomData<T>,
+    /// The bit widths of each mini block in the current block
+    mini_block_bit_widths: Vec<u8>,
+    /// The number of values remaining in the current mini block
+    mini_block_remaining: usize,
+
+    /// The first value from the block header if not consumed
+    first_value: Option<T::T>,
+    /// The last value to compute offsets from
+    last_value: T::T,
 }
 
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     /// Creates new delta bit packed decoder.
     pub fn new() -> Self {
         Self {
             bit_reader: BitReader::from(vec![]),
             initialized: false,
-            num_values: 0,
-            num_mini_blocks: 0,
+            block_size: 0,
+            values_left: 0,
+            mini_blocks_per_block: 0,
             values_per_mini_block: 0,
-            values_current_mini_block: 0,
-            first_value: 0,
-            first_value_read: false,
-            min_delta: 0,
+            min_delta: Default::default(),
             mini_block_idx: 0,
-            delta_bit_width: 0,
-            delta_bit_widths: ByteBuffer::new(),
-            deltas_in_mini_block: vec![],
-            use_batch: mem::size_of::<T::T>() == 4,
-            current_value: 0,
-            _phantom: PhantomData,
+            mini_block_bit_widths: vec![],
+            mini_block_remaining: 0,
+            block_end_offset: 0,
+            first_value: None,
+            last_value: Default::default(),
         }
     }
 
-    /// Returns underlying bit reader offset.
+    /// Returns the current offset
     pub fn get_offset(&self) -> usize {
         assert!(self.initialized, "Bit reader is not initialized");
-        self.bit_reader.get_byte_offset()
+        match self.values_left {
+            // If we've exhausted this page report the end of the current block
+            // as we may not have consumed the trailing padding
+            //
+            // The max is necessary to handle pages with no blocks

Review comment:
       Yup, the page header will be present but there will be no blocks :smile: 
Or at least that is what the encoder currently does... 
`test_delta_byte_array_single_array` fails if this isn't here. Will update 
comment

##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
     initialized: bool,
 
     // Header info
-    num_values: usize,
-    num_mini_blocks: i64,
+    // The number of values in each block
+    block_size: usize,
+    /// The number of values in the current page
+    values_left: usize,
+    /// The number of mini-blocks in each block
+    mini_blocks_per_block: usize,
+    /// The number of values in each mini block
     values_per_mini_block: usize,
-    values_current_mini_block: usize,
-    first_value: i64,
-    first_value_read: bool,
 
     // Per block info
-    min_delta: i64,
+    /// The minimum delta in the block
+    min_delta: T::T,
+    /// The byte offset of the end of the current block
+    block_end_offset: usize,
+    /// The index on the current mini block
     mini_block_idx: usize,
-    delta_bit_width: u8,
-    delta_bit_widths: ByteBuffer,
-    deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
-    use_batch: bool,
-
-    current_value: i64,
-
-    _phantom: PhantomData<T>,
+    /// The bit widths of each mini block in the current block
+    mini_block_bit_widths: Vec<u8>,
+    /// The number of values remaining in the current mini block
+    mini_block_remaining: usize,
+
+    /// The first value from the block header if not consumed
+    first_value: Option<T::T>,
+    /// The last value to compute offsets from
+    last_value: T::T,
 }
 
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     /// Creates new delta bit packed decoder.
     pub fn new() -> Self {
         Self {
             bit_reader: BitReader::from(vec![]),
             initialized: false,
-            num_values: 0,
-            num_mini_blocks: 0,
+            block_size: 0,
+            values_left: 0,
+            mini_blocks_per_block: 0,
             values_per_mini_block: 0,
-            values_current_mini_block: 0,
-            first_value: 0,
-            first_value_read: false,
-            min_delta: 0,
+            min_delta: Default::default(),
             mini_block_idx: 0,
-            delta_bit_width: 0,
-            delta_bit_widths: ByteBuffer::new(),
-            deltas_in_mini_block: vec![],
-            use_batch: mem::size_of::<T::T>() == 4,
-            current_value: 0,
-            _phantom: PhantomData,
+            mini_block_bit_widths: vec![],
+            mini_block_remaining: 0,
+            block_end_offset: 0,
+            first_value: None,
+            last_value: Default::default(),
         }
     }
 
-    /// Returns underlying bit reader offset.
+    /// Returns the current offset
     pub fn get_offset(&self) -> usize {
         assert!(self.initialized, "Bit reader is not initialized");
-        self.bit_reader.get_byte_offset()
+        match self.values_left {
+            // If we've exhausted this page report the end of the current block
+            // as we may not have consumed the trailing padding
+            //
+            // The max is necessary to handle pages with no blocks

Review comment:
       Yup, the page header will be present but there will be no blocks :smile: 
Or at least that is what the encoder currently does... 
`test_delta_byte_array_single_array` fails if this isn't here. Will update 
comment
   
   Edit: I think it actually occurs when there is only a single value, but I'm 
not 100% sure :sweat_smile: 

##########
File path: parquet/src/encodings/decoding.rs
##########
@@ -431,232 +433,253 @@ pub struct DeltaBitPackDecoder<T: DataType> {
     initialized: bool,
 
     // Header info
-    num_values: usize,
-    num_mini_blocks: i64,
+    // The number of values in each block
+    block_size: usize,
+    /// The number of values in the current page
+    values_left: usize,
+    /// The number of mini-blocks in each block
+    mini_blocks_per_block: usize,
+    /// The number of values in each mini block
     values_per_mini_block: usize,
-    values_current_mini_block: usize,
-    first_value: i64,
-    first_value_read: bool,
 
     // Per block info
-    min_delta: i64,
+    /// The minimum delta in the block
+    min_delta: T::T,
+    /// The byte offset of the end of the current block
+    block_end_offset: usize,
+    /// The index on the current mini block
     mini_block_idx: usize,
-    delta_bit_width: u8,
-    delta_bit_widths: ByteBuffer,
-    deltas_in_mini_block: Vec<T::T>, // eagerly loaded deltas for a mini block
-    use_batch: bool,
-
-    current_value: i64,
-
-    _phantom: PhantomData<T>,
+    /// The bit widths of each mini block in the current block
+    mini_block_bit_widths: Vec<u8>,
+    /// The number of values remaining in the current mini block
+    mini_block_remaining: usize,
+
+    /// The first value from the block header if not consumed
+    first_value: Option<T::T>,
+    /// The last value to compute offsets from
+    last_value: T::T,
 }
 
-impl<T: DataType> DeltaBitPackDecoder<T> {
+impl<T: DataType> DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     /// Creates new delta bit packed decoder.
     pub fn new() -> Self {
         Self {
             bit_reader: BitReader::from(vec![]),
             initialized: false,
-            num_values: 0,
-            num_mini_blocks: 0,
+            block_size: 0,
+            values_left: 0,
+            mini_blocks_per_block: 0,
             values_per_mini_block: 0,
-            values_current_mini_block: 0,
-            first_value: 0,
-            first_value_read: false,
-            min_delta: 0,
+            min_delta: Default::default(),
             mini_block_idx: 0,
-            delta_bit_width: 0,
-            delta_bit_widths: ByteBuffer::new(),
-            deltas_in_mini_block: vec![],
-            use_batch: mem::size_of::<T::T>() == 4,
-            current_value: 0,
-            _phantom: PhantomData,
+            mini_block_bit_widths: vec![],
+            mini_block_remaining: 0,
+            block_end_offset: 0,
+            first_value: None,
+            last_value: Default::default(),
         }
     }
 
-    /// Returns underlying bit reader offset.
+    /// Returns the current offset
     pub fn get_offset(&self) -> usize {
         assert!(self.initialized, "Bit reader is not initialized");
-        self.bit_reader.get_byte_offset()
+        match self.values_left {
+            // If we've exhausted this page report the end of the current block
+            // as we may not have consumed the trailing padding
+            //
+            // The max is necessary to handle pages with no blocks
+            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
+            _ => self.bit_reader.get_byte_offset(),
+        }
     }
 
-    /// Initializes new mini block.
+    /// Initializes the next block and the first mini block within it
     #[inline]
-    fn init_block(&mut self) -> Result<()> {
-        self.min_delta = self
+    fn next_block(&mut self) -> Result<()> {
+        let min_delta = self
             .bit_reader
             .get_zigzag_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
 
-        self.delta_bit_widths.clear();
-        for _ in 0..self.num_mini_blocks {
-            let w = self
-                .bit_reader
-                .get_aligned::<u8>(1)
-                .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?;
-            self.delta_bit_widths.push(w);
+        self.min_delta = T::T::from_i64(min_delta)
+            .ok_or_else(|| general_err!("'min_delta' too large"))?;
+
+        self.mini_block_bit_widths.clear();
+        self.bit_reader.get_aligned_bytes(
+            &mut self.mini_block_bit_widths,
+            self.mini_blocks_per_block as usize,
+        );
+
+        let mut offset = self.bit_reader.get_byte_offset();
+        let mut remaining = self.values_left;
+
+        // Compute the end offset of the current block
+        for b in &mut self.mini_block_bit_widths {
+            if remaining == 0 {
+                // Specification requires handling arbitrary bit widths
+                // for trailing mini blocks
+                *b = 0;
+            }
+            remaining = remaining.saturating_sub(self.values_per_mini_block);
+            offset += *b as usize * self.values_per_mini_block / 8;
+        }
+        self.block_end_offset = offset;
+
+        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
+            return Err(eof_err!("insufficient mini block bit widths"));
         }
 
+        self.mini_block_remaining = self.values_per_mini_block;
         self.mini_block_idx = 0;
-        self.delta_bit_width = self.delta_bit_widths.data()[0];
-        self.values_current_mini_block = self.values_per_mini_block;
+
         Ok(())
     }
 
-    /// Loads delta into mini block.
+    /// Initializes the next mini block
     #[inline]
-    fn load_deltas_in_mini_block(&mut self) -> Result<()>
-    where
-        T::T: FromBytes,
-    {
-        if self.use_batch {
-            self.deltas_in_mini_block
-                .resize(self.values_current_mini_block, T::T::default());
-            let loaded = self.bit_reader.get_batch::<T::T>(
-                &mut self.deltas_in_mini_block[..],
-                self.delta_bit_width as usize,
-            );
-            assert!(loaded == self.values_current_mini_block);
+    fn next_mini_block(&mut self) -> Result<()> {
+        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
+            self.mini_block_idx += 1;
+            self.mini_block_remaining = self.values_per_mini_block;
+            Ok(())
         } else {
-            self.deltas_in_mini_block.clear();
-            for _ in 0..self.values_current_mini_block {
-                // TODO: load one batch at a time similar to int32
-                let delta = self
-                    .bit_reader
-                    .get_value::<T::T>(self.delta_bit_width as usize)
-                    .ok_or_else(|| eof_err!("Not enough data to decode 
'delta'"))?;
-                self.deltas_in_mini_block.push(delta);
-            }
+            self.next_block()
         }
-
-        Ok(())
     }
 }
 
-impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T> {
+impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
+where
+    T::T: Default + FromPrimitive + WrappingAdd + Copy,
+{
     // # of total values is derived from encoding
     #[inline]
     fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> {
         self.bit_reader = BitReader::new(data);
         self.initialized = true;
 
-        let block_size = self
+        // Read header information
+        self.block_size = self
             .bit_reader
             .get_vlq_int()
-            .ok_or_else(|| eof_err!("Not enough data to decode 
'block_size'"))?;
-        self.num_mini_blocks = self
+            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
+            .try_into()
+            .map_err(|_| general_err!("invalid 'block_size'"))?;
+
+        self.mini_blocks_per_block = self
             .bit_reader
             .get_vlq_int()
-            .ok_or_else(|| eof_err!("Not enough data to decode 
'num_mini_blocks'"))?;
-        self.num_values = self
+            .ok_or_else(|| eof_err!("Not enough data to decode 
'mini_blocks_per_block'"))?
+            .try_into()
+            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
+
+        self.values_left = self
             .bit_reader
             .get_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 'num_values'"))?
-            as usize;
-        self.first_value = self
+            .try_into()
+            .map_err(|_| general_err!("invalid 'num_values'"))?;
+
+        let first_value = self
             .bit_reader
             .get_zigzag_vlq_int()
             .ok_or_else(|| eof_err!("Not enough data to decode 
'first_value'"))?;
 
+        self.first_value = Some(
+            T::T::from_i64(first_value)
+                .ok_or_else(|| general_err!("first value too large"))?,
+        );
+
+        if self.block_size % 128 != 0 {
+            return Err(general_err!(
+                "'block_size' must be a multiple of 128, got {}",
+                self.block_size
+            ));
+        }
+
+        if self.block_size % self.mini_blocks_per_block != 0 {
+            return Err(general_err!(
+                "'block_size' must be a multiple of 'mini_blocks_per_block' 
got {} and {}",
+                self.block_size, self.mini_blocks_per_block
+            ));
+        }
+
         // Reset decoding state
-        self.first_value_read = false;
         self.mini_block_idx = 0;
-        self.delta_bit_widths.clear();
-        self.values_current_mini_block = 0;
+        self.values_per_mini_block = self.block_size / 
self.mini_blocks_per_block;
+        self.mini_block_remaining = 0;
+        self.mini_block_bit_widths.clear();
 
-        self.values_per_mini_block = (block_size / self.num_mini_blocks) as 
usize;
-        assert!(self.values_per_mini_block % 8 == 0);
+        if self.values_per_mini_block % 32 != 0 {

Review comment:
       This feels to me like a bug in parquet-mr :sweat_smile: 

##########
File path: parquet/benches/arrow_reader.rs
##########
@@ -17,15 +17,19 @@
 
 use arrow::array::Array;
 use arrow::datatypes::DataType;
-use criterion::{criterion_group, criterion_main, Criterion};
+use criterion::measurement::WallTime;

Review comment:
       The benchmarks are reworked to allow using the same code for different 
encodings of integer primitives, along with different primitive types (Int32, 
Int64). It should be fairly mechanical to extend to other types should we wish 
to in future

##########
File path: parquet/benches/arrow_reader.rs
##########
@@ -351,162 +379,224 @@ fn create_complex_object_byte_array_dictionary_reader(
     )
 }
 
-fn add_benches(c: &mut Criterion) {
-    const EXPECTED_VALUE_COUNT: usize =
-        NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
-    let mut group = c.benchmark_group("arrow_array_reader");
-
+fn bench_primitive<T>(
+    group: &mut BenchmarkGroup<WallTime>,
+    schema: &SchemaDescPtr,
+    mandatory_column_desc: &ColumnDescPtr,
+    optional_column_desc: &ColumnDescPtr,
+) where
+    T: parquet::data_type::DataType,
+    T::T: SampleUniform + FromPrimitive + Copy,
+{
     let mut count: usize = 0;
 
-    let schema = build_test_schema();
-    let mandatory_int32_column_desc = schema.column(0);
-    let optional_int32_column_desc = schema.column(1);
-    let mandatory_string_column_desc = schema.column(2);
-    let optional_string_column_desc = schema.column(3);
-    // primitive / int32 benchmarks
-    // =============================
-
-    // int32, plain encoded, no NULLs
-    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+    // plain encoded, no NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        mandatory_int32_column_desc.clone(),
+        mandatory_column_desc.clone(),
         0.0,
+        Encoding::PLAIN,
     );
-    group.bench_function("read Int32Array, plain encoded, mandatory, no 
NULLs", |b| {
+    group.bench_function("plain encoded, mandatory, no NULLs", |b| {
         b.iter(|| {
-            let array_reader = create_int32_primitive_array_reader(
-                plain_int32_no_null_data.clone(),
-                mandatory_int32_column_desc.clone(),
+            let array_reader = create_primitive_array_reader(
+                data.clone(),
+                mandatory_column_desc.clone(),
             );
             count = bench_array_reader(array_reader);
         });
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
 
-    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+    let data = build_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        optional_int32_column_desc.clone(),
+        optional_column_desc.clone(),
         0.0,
+        Encoding::PLAIN,
     );
-    group.bench_function("read Int32Array, plain encoded, optional, no NULLs", 
|b| {
+    group.bench_function("plain encoded, optional, no NULLs", |b| {
         b.iter(|| {
-            let array_reader = create_int32_primitive_array_reader(
-                plain_int32_no_null_data.clone(),
-                optional_int32_column_desc.clone(),
-            );
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
             count = bench_array_reader(array_reader);
         });
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
 
-    // int32, plain encoded, half NULLs
-    let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
+    // plain encoded, half NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        optional_int32_column_desc.clone(),
+        optional_column_desc.clone(),
         0.5,
+        Encoding::PLAIN,
     );
-    group.bench_function(
-        "read Int32Array, plain encoded, optional, half NULLs",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    plain_int32_half_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
+    group.bench_function("plain encoded, optional, half NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    // binary packed, no NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
+        schema.clone(),
+        mandatory_column_desc.clone(),
+        0.0,
+        Encoding::DELTA_BINARY_PACKED,
     );
+    group.bench_function("binary packed, mandatory, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader = create_primitive_array_reader(
+                data.clone(),
+                mandatory_column_desc.clone(),
+            );
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
-    // int32, dictionary encoded, no NULLs
-    let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
+    let data = build_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        mandatory_int32_column_desc.clone(),
+        optional_column_desc.clone(),
         0.0,
+        Encoding::DELTA_BINARY_PACKED,
     );
-    group.bench_function(
-        "read Int32Array, dictionary encoded, mandatory, no NULLs",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    dictionary_int32_no_null_data.clone(),
-                    mandatory_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
+    group.bench_function("binary packed, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    // binary packed, half NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
+        schema.clone(),
+        optional_column_desc.clone(),
+        0.5,
+        Encoding::DELTA_BINARY_PACKED,
     );
+    group.bench_function("binary packed, optional, half NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
-    let dictionary_int32_no_null_data = 
build_dictionary_encoded_int32_page_iterator(
+    // dictionary encoded, no NULLs
+    let data = build_dictionary_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        optional_int32_column_desc.clone(),
+        mandatory_column_desc.clone(),
         0.0,
     );
-    group.bench_function(
-        "read Int32Array, dictionary encoded, optional, no NULLs",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    dictionary_int32_no_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
+    group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader = create_primitive_array_reader(
+                data.clone(),
+                mandatory_column_desc.clone(),
+            );
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_dictionary_encoded_primitive_page_iterator::<T>(
+        schema.clone(),
+        optional_column_desc.clone(),
+        0.0,
     );
+    group.bench_function("dictionary encoded, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 
-    // int32, dictionary encoded, half NULLs
-    let dictionary_int32_half_null_data = 
build_dictionary_encoded_int32_page_iterator(
+    // dictionary encoded, half NULLs
+    let data = build_dictionary_encoded_primitive_page_iterator::<T>(
         schema.clone(),
-        optional_int32_column_desc.clone(),
+        optional_column_desc.clone(),
         0.5,
     );
-    group.bench_function(
-        "read Int32Array, dictionary encoded, optional, half NULLs",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_int32_primitive_array_reader(
-                    dictionary_int32_half_null_data.clone(),
-                    optional_int32_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
+    group.bench_function("dictionary encoded, optional, half NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+}
+
+fn add_benches(c: &mut Criterion) {
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    let optional_string_column_desc = schema.column(3);
+    let mandatory_int64_column_desc = schema.column(4);
+    let optional_int64_column_desc = schema.column(5);
+    // primitive / int32 benchmarks
+    // =============================
+
+    let mut group = c.benchmark_group("arrow_array_reader/Int32Array");
+    bench_primitive::<Int32Type>(
+        &mut group,
+        &schema,
+        &mandatory_int32_column_desc,
+        &optional_int32_column_desc,
+    );
+    group.finish();
+
+    // primitive / int64 benchmarks
+    // =============================
+
+    let mut group = c.benchmark_group("arrow_array_reader/Int64Array");
+    bench_primitive::<Int64Type>(
+        &mut group,
+        &schema,
+        &mandatory_int64_column_desc,
+        &optional_int64_column_desc,
     );
+    group.finish();
 
     // string benchmarks
     //==============================
 
+    let mut group = c.benchmark_group("arrow_array_reader/StringArray");
+
     // string, plain encoded, no NULLs
     let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
         schema.clone(),
         mandatory_string_column_desc.clone(),
         0.0,
     );
-    group.bench_function(
-        "read StringArray, plain encoded, mandatory, no NULLs",
-        |b| {
-            b.iter(|| {
-                let array_reader = create_string_byte_array_reader(
-                    plain_string_no_null_data.clone(),
-                    mandatory_string_column_desc.clone(),
-                );
-                count = bench_array_reader(array_reader);
-            });
-            assert_eq!(count, EXPECTED_VALUE_COUNT);
-        },
-    );
+    group.bench_function("plain encoded, mandatory, no NULLs", |b| {

Review comment:
       The type, i.e. `StringArray` is now encoded in the group name

##########
File path: parquet/benches/arrow_reader.rs
##########
@@ -78,16 +89,17 @@ fn build_plain_encoded_int32_page_iterator(
                     max_def_level
                 };
                 if def_level == max_def_level {
-                    int32_value += 1;
-                    values.push(int32_value);
+                    let value =

Review comment:
       Yes, it's otherwise too optimal 😂




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to