This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ace14018e Add benchmarks for `BYTE_STREAM_SPLIT` encoded Parquet 
`FIXED_LEN_BYTE_ARRAY` data  (#6204)
ace14018e is described below

commit ace14018ed3f6571b313e5e34761128242853fda
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Aug 8 04:45:12 2024 -0700

    Add benchmarks for `BYTE_STREAM_SPLIT` encoded Parquet 
`FIXED_LEN_BYTE_ARRAY` data  (#6204)
    
    * save type_width for fixed_len_byte_array
    
    * add decimal128 and float16 byte_stream_split benches
    
    * add f16
    
    * add decimal128 flba(16) bench
---
 parquet/benches/arrow_reader.rs           | 235 +++++++++++++++++++++++++++++-
 parquet/benches/encoding.rs               |  27 +++-
 parquet/src/util/test_common/page_util.rs |   6 +-
 3 files changed, 261 insertions(+), 7 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 814e75c24..18e16f0a4 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -20,6 +20,7 @@ use arrow::datatypes::DataType;
 use arrow_schema::Field;
 use criterion::measurement::WallTime;
 use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
+use half::f16;
 use num::FromPrimitive;
 use num_bigint::BigInt;
 use parquet::arrow::array_reader::{
@@ -65,6 +66,8 @@ fn build_test_schema() -> SchemaDescPtr {
             }
             REQUIRED BYTE_ARRAY mandatory_binary_leaf;
             OPTIONAL BYTE_ARRAY optional_binary_leaf;
+            REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16);
+            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16);
         }
         ";
     parse_message_type(message_type)
@@ -84,6 +87,64 @@ pub fn seedable_rng() -> StdRng {
     StdRng::seed_from_u64(42)
 }
 
+// support byte array for float16
+fn build_encoded_f16_bytes_page_iterator<T>(
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+    encoding: Encoding,
+    min: f32,
+    max: f32,
+) -> impl PageIterator + Clone
+where
+    T: parquet::data_type::DataType,
+    T::T: From<Vec<u8>>,
+{
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // create the Float16 value
+                    let value = f16::from_f32(rng.gen_range(min..max));
+                    // Float16 in parquet is stored little-endian
+                    let bytes = match column_desc.physical_type() {
+                        Type::FIXED_LEN_BYTE_ARRAY => {
+                            // Float16 annotates FIXED_LEN_BYTE_ARRAY(2)
+                            assert_eq!(column_desc.type_length(), 2);
+                            value.to_le_bytes().to_vec()
+                        }
+                        _ => unimplemented!(),
+                    };
+                    let value = T::T::from(bytes);
+                    values.push(value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<T>(encoding, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+    InMemoryPageIterator::new(pages)
+}
+
 // support byte array for decimal
 fn build_encoded_decimal_bytes_page_iterator<T>(
     column_desc: ColumnDescPtr,
@@ -494,6 +555,19 @@ fn create_primitive_array_reader(
     }
 }
 
+fn create_f16_by_bytes_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+    let physical_type = column_desc.physical_type();
+    match physical_type {
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            make_fixed_len_byte_array_reader(Box::new(page_iterator), 
column_desc, None).unwrap()
+        }
+        _ => unimplemented!(),
+    }
+}
+
 fn create_decimal_by_bytes_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
@@ -616,6 +690,131 @@ fn bench_byte_decimal<T>(
     });
 }
 
+fn bench_byte_stream_split_f16<T>(
+    group: &mut BenchmarkGroup<WallTime>,
+    mandatory_column_desc: &ColumnDescPtr,
+    optional_column_desc: &ColumnDescPtr,
+    min: f32,
+    max: f32,
+) where
+    T: parquet::data_type::DataType,
+    T::T: From<Vec<u8>>,
+{
+    let mut count: usize = 0;
+
+    // byte_stream_split encoded, no NULLs
+    let data = build_encoded_f16_bytes_page_iterator::<T>(
+        mandatory_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| 
{
+        b.iter(|| {
+            let array_reader =
+                create_f16_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_encoded_f16_bytes_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_encoded_f16_bytes_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.5,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, half NULLs", 
|b| {
+        b.iter(|| {
+            let array_reader =
+                create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+}
+
+fn bench_byte_stream_split_decimal<T>(
+    group: &mut BenchmarkGroup<WallTime>,
+    mandatory_column_desc: &ColumnDescPtr,
+    optional_column_desc: &ColumnDescPtr,
+    min: i128,
+    max: i128,
+) where
+    T: parquet::data_type::DataType,
+    T::T: From<Vec<u8>>,
+{
+    let mut count: usize = 0;
+
+    // byte_stream_split encoded, no NULLs
+    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+        mandatory_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| 
{
+        b.iter(|| {
+            let array_reader =
+                create_decimal_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    // half null
+    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.5,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, half NULLs", 
|b| {
+        b.iter(|| {
+            let array_reader =
+                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+}
+
 fn bench_primitive<T>(
     group: &mut BenchmarkGroup<WallTime>,
     mandatory_column_desc: &ColumnDescPtr,
@@ -797,6 +996,35 @@ fn bench_primitive<T>(
     });
 }
 
+fn byte_stream_split_benches(c: &mut Criterion) {
+    let schema = build_test_schema();
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array");
+    let mandatory_decimal4_leaf_desc = schema.column(12);
+    let optional_decimal4_leaf_desc = schema.column(13);
+    bench_byte_stream_split_decimal::<FixedLenByteArrayType>(
+        &mut group,
+        &mandatory_decimal4_leaf_desc,
+        &optional_decimal4_leaf_desc,
+        // precision is 16: the max is 9999999999999999
+        9999999999999000,
+        9999999999999999,
+    );
+    group.finish();
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array");
+    let mandatory_f16_leaf_desc = schema.column(17);
+    let optional_f16_leaf_desc = schema.column(18);
+    bench_byte_stream_split_f16::<FixedLenByteArrayType>(
+        &mut group,
+        &mandatory_f16_leaf_desc,
+        &optional_f16_leaf_desc,
+        -1.0,
+        1.0,
+    );
+    group.finish();
+}
+
 fn decimal_benches(c: &mut Criterion) {
     let schema = build_test_schema();
     // parquet int32, logical type decimal(8,2)
@@ -1334,5 +1562,10 @@ fn add_benches(c: &mut Criterion) {
     });
 }
 
-criterion_group!(benches, add_benches, decimal_benches,);
+criterion_group!(
+    benches,
+    add_benches,
+    decimal_benches,
+    byte_stream_split_benches,
+);
 criterion_main!(benches);
diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs
index 80befe8da..bc18a49da 100644
--- a/parquet/benches/encoding.rs
+++ b/parquet/benches/encoding.rs
@@ -16,15 +16,23 @@
 // under the License.
 
 use criterion::*;
+use half::f16;
 use parquet::basic::Encoding;
-use parquet::data_type::{DataType, DoubleType, FloatType};
+use parquet::data_type::{
+    DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType,
+};
 use parquet::decoding::{get_decoder, Decoder};
 use parquet::encoding::get_encoder;
 use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, 
Type};
 use rand::prelude::*;
 use std::sync::Arc;
 
-fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: 
Encoding) {
+fn bench_typed<T: DataType>(
+    c: &mut Criterion,
+    values: &[T::T],
+    encoding: Encoding,
+    type_length: i32,
+) {
     let name = format!(
         "dtype={}, encoding={:?}",
         std::any::type_name::<T::T>(),
@@ -33,6 +41,7 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values: 
&[T::T], encoding: Encodi
     let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
         Arc::new(
             Type::primitive_type_builder("", T::get_physical_type())
+                .with_length(type_length)
                 .build()
                 .unwrap(),
         ),
@@ -68,15 +77,25 @@ fn criterion_benchmark(c: &mut Criterion) {
     let mut rng = StdRng::seed_from_u64(0);
     let n = 16 * 1024;
 
+    let mut f16s = Vec::new();
     let mut f32s = Vec::new();
     let mut f64s = Vec::new();
+    let mut d128s = Vec::new();
     for _ in 0..n {
+        f16s.push(FixedLenByteArray::from(
+            f16::from_f32(rng.gen::<f32>()).to_le_bytes().to_vec(),
+        ));
         f32s.push(rng.gen::<f32>());
         f64s.push(rng.gen::<f64>());
+        d128s.push(FixedLenByteArray::from(
+            rng.gen::<i128>().to_be_bytes().to_vec(),
+        ));
     }
 
-    bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT);
-    bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT);
+    bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT, 0);
+    bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT, 0);
+    bench_typed::<FixedLenByteArrayType>(c, &f16s, 
Encoding::BYTE_STREAM_SPLIT, 2);
+    bench_typed::<FixedLenByteArrayType>(c, &d128s, 
Encoding::BYTE_STREAM_SPLIT, 16);
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 3db43aef0..a1709efa9 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -51,13 +51,14 @@ pub struct DataPageBuilderImpl {
     rep_levels_byte_len: u32,
     def_levels_byte_len: u32,
     datapage_v2: bool,
+    type_width: i32,
 }
 
 impl DataPageBuilderImpl {
     // `num_values` is the number of non-null values to put in the data page.
     // `datapage_v2` flag is used to indicate if the generated data page 
should use V2
     // format or not.
-    pub fn new(_desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> 
Self {
+    pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> 
Self {
         DataPageBuilderImpl {
             encoding: None,
             num_values,
@@ -65,6 +66,7 @@ impl DataPageBuilderImpl {
             rep_levels_byte_len: 0,
             def_levels_byte_len: 0,
             datapage_v2,
+            type_width: desc.type_length(),
         }
     }
 
@@ -111,7 +113,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
         // Create test column descriptor.
         let desc = {
             let ty = SchemaType::primitive_type_builder("t", 
T::get_physical_type())
-                .with_length(0)
+                .with_length(self.type_width)
                 .build()
                 .unwrap();
             Arc::new(ColumnDescriptor::new(

Reply via email to