This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b9217b27 Speed up `pad_nulls` for `FixedLenByteArrayBuffer` (#6297)
9b9217b27 is described below

commit 9b9217b2778195d2f324398a7814f94ce346aae2
Author: Ed Seidl <[email protected]>
AuthorDate: Wed Oct 2 03:21:14 2024 -0700

    Speed up `pad_nulls` for `FixedLenByteArrayBuffer` (#6297)
    
    * optimize pad_nulls for fixed_len_byte_array
    
    * start refactor of benchmarks
    
    * refactor float16
    
    * add fixed_len_byte_array to float16 benches
    
    * replace copy_within with vectorizable copy
    
    * update comment
    
    * move branch on byte_length outside of loop
    
    * reduce code duplication while preserving performance gains
    
    * formatting
    
    * silence clippy
    
    * clippy won again
---
 parquet/benches/arrow_reader.rs                    | 442 +++++++++++++++------
 .../src/arrow/array_reader/fixed_len_byte_array.rs |  56 ++-
 2 files changed, 368 insertions(+), 130 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 18e16f0a4..f165adbe8 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -68,6 +68,14 @@ fn build_test_schema() -> SchemaDescPtr {
             OPTIONAL BYTE_ARRAY optional_binary_leaf;
             REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16);
             OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16);
+            REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_flba2_leaf;
+            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_flba2_leaf;
+            REQUIRED FIXED_LEN_BYTE_ARRAY (4) mandatory_flba4_leaf;
+            OPTIONAL FIXED_LEN_BYTE_ARRAY (4) optional_flba4_leaf;
+            REQUIRED FIXED_LEN_BYTE_ARRAY (8) mandatory_flba8_leaf;
+            OPTIONAL FIXED_LEN_BYTE_ARRAY (8) optional_flba8_leaf;
+            REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_flba16_leaf;
+            OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_flba16_leaf;
         }
         ";
     parse_message_type(message_type)
@@ -209,6 +217,50 @@ where
     InMemoryPageIterator::new(pages)
 }
 
+// support for fixed_len_byte_arrays
+fn build_encoded_flba_bytes_page_iterator<const BYTE_LENGTH: usize>(
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+    encoding: Encoding,
+) -> impl PageIterator + Clone {
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // create the FLBA(BYTE_LENGTH) value
+                    let value = (0..BYTE_LENGTH).map(|_| 
rng.gen()).collect::<Vec<u8>>();
+                    let value =
+                        <FixedLenByteArrayType as 
parquet::data_type::DataType>::T::from(value);
+                    values.push(value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as 
u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<FixedLenByteArrayType>(encoding, 
&values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+    InMemoryPageIterator::new(pages)
+}
+
 fn build_encoded_primitive_page_iterator<T>(
     column_desc: ColumnDescPtr,
     null_density: f32,
@@ -584,6 +636,13 @@ fn create_decimal_by_bytes_reader(
     }
 }
 
+fn create_fixed_len_byte_array_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+    make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, 
None).unwrap()
+}
+
 fn create_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
@@ -630,6 +689,7 @@ fn bench_byte_decimal<T>(
     group: &mut BenchmarkGroup<WallTime>,
     mandatory_column_desc: &ColumnDescPtr,
     optional_column_desc: &ColumnDescPtr,
+    encoding: Encoding,
     min: i128,
     max: i128,
 ) where
@@ -639,61 +699,71 @@ fn bench_byte_decimal<T>(
     // all are plain encoding
     let mut count: usize = 0;
 
-    // plain encoded, no NULLs
+    // no NULLs
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
         mandatory_column_desc.clone(),
         0.0,
-        Encoding::PLAIN,
+        encoding,
         min,
         max,
     );
-    group.bench_function("plain encoded, mandatory, no NULLs", |b| {
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_decimal_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
         optional_column_desc.clone(),
         0.0,
-        Encoding::PLAIN,
+        encoding,
         min,
         max,
     );
-    group.bench_function("plain encoded, optional, no NULLs", |b| {
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 
     // half null
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
         optional_column_desc.clone(),
         0.5,
-        Encoding::PLAIN,
+        encoding,
         min,
         max,
     );
-    group.bench_function("plain encoded, optional, half NULLs", |b| {
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, half NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 }
 
-fn bench_byte_stream_split_f16<T>(
+fn bench_f16<T>(
     group: &mut BenchmarkGroup<WallTime>,
     mandatory_column_desc: &ColumnDescPtr,
     optional_column_desc: &ColumnDescPtr,
+    encoding: Encoding,
     min: f32,
     max: f32,
 ) where
@@ -706,113 +776,141 @@ fn bench_byte_stream_split_f16<T>(
     let data = build_encoded_f16_bytes_page_iterator::<T>(
         mandatory_column_desc.clone(),
         0.0,
-        Encoding::BYTE_STREAM_SPLIT,
+        encoding,
         min,
         max,
     );
-    group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| 
{
-        b.iter(|| {
-            let array_reader =
-                create_f16_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_f16_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 
     let data = build_encoded_f16_bytes_page_iterator::<T>(
         optional_column_desc.clone(),
         0.0,
-        Encoding::BYTE_STREAM_SPLIT,
+        encoding,
         min,
         max,
     );
-    group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
-        b.iter(|| {
-            let array_reader =
-                create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 
     let data = build_encoded_f16_bytes_page_iterator::<T>(
         optional_column_desc.clone(),
         0.5,
-        Encoding::BYTE_STREAM_SPLIT,
+        encoding,
         min,
         max,
     );
-    group.bench_function("byte_stream_split encoded, optional, half NULLs", 
|b| {
-        b.iter(|| {
-            let array_reader =
-                create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, half NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_f16_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
 }
 
-fn bench_byte_stream_split_decimal<T>(
+fn bench_flba<const BYTE_LENGTH: usize>(
     group: &mut BenchmarkGroup<WallTime>,
     mandatory_column_desc: &ColumnDescPtr,
     optional_column_desc: &ColumnDescPtr,
-    min: i128,
-    max: i128,
-) where
-    T: parquet::data_type::DataType,
-    T::T: From<Vec<u8>>,
-{
+    encoding: Encoding,
+) {
     let mut count: usize = 0;
 
-    // byte_stream_split encoded, no NULLs
-    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+    encoding.to_string();
+    // no NULLs
+    let data = build_encoded_flba_bytes_page_iterator::<BYTE_LENGTH>(
         mandatory_column_desc.clone(),
         0.0,
-        Encoding::BYTE_STREAM_SPLIT,
-        min,
-        max,
+        encoding,
+    );
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_fixed_len_byte_array_reader(data.clone(), 
mandatory_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
     );
-    group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| 
{
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
mandatory_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
 
-    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+    let data = build_encoded_flba_bytes_page_iterator::<BYTE_LENGTH>(
         optional_column_desc.clone(),
         0.0,
-        Encoding::BYTE_STREAM_SPLIT,
-        min,
-        max,
+        encoding,
+    );
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, no NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_fixed_len_byte_array_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
     );
-    group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
 
     // half null
-    let data = build_encoded_decimal_bytes_page_iterator::<T>(
+    let data = build_encoded_flba_bytes_page_iterator::<BYTE_LENGTH>(
         optional_column_desc.clone(),
         0.5,
+        encoding,
+    );
+    group.bench_function(
+        encoding.to_string().to_lowercase() + " encoded, optional, half NULLs",
+        |b| {
+            b.iter(|| {
+                let array_reader =
+                    create_fixed_len_byte_array_reader(data.clone(), 
optional_column_desc.clone());
+                count = bench_array_reader(array_reader);
+            });
+            assert_eq!(count, EXPECTED_VALUE_COUNT);
+        },
+    );
+}
+
+fn bench_fixed_len_byte_array<const BYTE_LENGTH: usize>(
+    group: &mut BenchmarkGroup<WallTime>,
+    mandatory_column_desc: &ColumnDescPtr,
+    optional_column_desc: &ColumnDescPtr,
+) {
+    bench_flba::<BYTE_LENGTH>(
+        group,
+        mandatory_column_desc,
+        optional_column_desc,
+        Encoding::PLAIN,
+    );
+    bench_flba::<BYTE_LENGTH>(
+        group,
+        mandatory_column_desc,
+        optional_column_desc,
         Encoding::BYTE_STREAM_SPLIT,
-        min,
-        max,
     );
-    group.bench_function("byte_stream_split encoded, optional, half NULLs", 
|b| {
-        b.iter(|| {
-            let array_reader =
-                create_decimal_by_bytes_reader(data.clone(), 
optional_column_desc.clone());
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
 }
 
 fn bench_primitive<T>(
@@ -994,31 +1092,82 @@ fn bench_primitive<T>(
         });
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
+
+    // byte_stream_split encoded, no NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
+        mandatory_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| 
{
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
mandatory_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_encoded_primitive_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.0,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    // plain encoded, half NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
+        optional_column_desc.clone(),
+        0.5,
+        Encoding::BYTE_STREAM_SPLIT,
+        min,
+        max,
+    );
+    group.bench_function("byte_stream_split encoded, optional, half NULLs", 
|b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
 }
 
-fn byte_stream_split_benches(c: &mut Criterion) {
+fn float16_benches(c: &mut Criterion) {
     let schema = build_test_schema();
 
-    let mut group = 
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array");
-    let mandatory_decimal4_leaf_desc = schema.column(12);
-    let optional_decimal4_leaf_desc = schema.column(13);
-    bench_byte_stream_split_decimal::<FixedLenByteArrayType>(
+    let mut group = 
c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array");
+    let mandatory_f16_leaf_desc = schema.column(17);
+    let optional_f16_leaf_desc = schema.column(18);
+    bench_f16::<FixedLenByteArrayType>(
         &mut group,
-        &mandatory_decimal4_leaf_desc,
-        &optional_decimal4_leaf_desc,
-        // precision is 16: the max is 9999999999999999
-        9999999999999000,
-        9999999999999999,
+        &mandatory_f16_leaf_desc,
+        &optional_f16_leaf_desc,
+        Encoding::PLAIN,
+        -1.0,
+        1.0,
     );
     group.finish();
 
-    let mut group = 
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array");
+    let mut group = 
c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array");
     let mandatory_f16_leaf_desc = schema.column(17);
     let optional_f16_leaf_desc = schema.column(18);
-    bench_byte_stream_split_f16::<FixedLenByteArrayType>(
+    bench_f16::<FixedLenByteArrayType>(
         &mut group,
         &mandatory_f16_leaf_desc,
         &optional_f16_leaf_desc,
+        Encoding::BYTE_STREAM_SPLIT,
         -1.0,
         1.0,
     );
@@ -1063,19 +1212,36 @@ fn decimal_benches(c: &mut Criterion) {
         &mut group,
         &mandatory_decimal3_leaf_desc,
         &optional_decimal3_leaf_desc,
+        Encoding::PLAIN,
+        // precision is 16: the max is 9999999999999999
+        9999999999999000,
+        9999999999999999,
+    );
+    group.finish();
+
+    // parquet FIXED_LEN_BYTE_ARRAY, logical type decimal(16,2)
+    let mut group = 
c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array");
+    let mandatory_decimal4_leaf_desc = schema.column(12);
+    let optional_decimal4_leaf_desc = schema.column(13);
+    bench_byte_decimal::<FixedLenByteArrayType>(
+        &mut group,
+        &mandatory_decimal4_leaf_desc,
+        &optional_decimal4_leaf_desc,
+        Encoding::PLAIN,
         // precision is 16: the max is 9999999999999999
         9999999999999000,
         9999999999999999,
     );
     group.finish();
 
-    let mut group = 
c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array");
+    let mut group = 
c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array");
     let mandatory_decimal4_leaf_desc = schema.column(12);
     let optional_decimal4_leaf_desc = schema.column(13);
     bench_byte_decimal::<FixedLenByteArrayType>(
         &mut group,
         &mandatory_decimal4_leaf_desc,
         &optional_decimal4_leaf_desc,
+        Encoding::BYTE_STREAM_SPLIT,
         // precision is 16: the max is 9999999999999999
         9999999999999000,
         9999999999999999,
@@ -1560,12 +1726,52 @@ fn add_benches(c: &mut Criterion) {
         });
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
+
+    group.finish();
+
+    // fixed_len_byte_array benchmarks
+    //==============================
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/FixedLenByteArray(2)");
+    let mandatory_flba2_leaf_desc = schema.column(19);
+    let optional_flba2_leaf_desc = schema.column(20);
+    bench_fixed_len_byte_array::<2>(
+        &mut group,
+        &mandatory_flba2_leaf_desc,
+        &optional_flba2_leaf_desc,
+    );
+    group.finish();
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/FixedLenByteArray(4)");
+    let mandatory_flba4_leaf_desc = schema.column(21);
+    let optional_flba4_leaf_desc = schema.column(22);
+    bench_fixed_len_byte_array::<4>(
+        &mut group,
+        &mandatory_flba4_leaf_desc,
+        &optional_flba4_leaf_desc,
+    );
+    group.finish();
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/FixedLenByteArray(8)");
+    let mandatory_flba8_leaf_desc = schema.column(23);
+    let optional_flba8_leaf_desc = schema.column(24);
+    bench_fixed_len_byte_array::<8>(
+        &mut group,
+        &mandatory_flba8_leaf_desc,
+        &optional_flba8_leaf_desc,
+    );
+    group.finish();
+
+    let mut group = 
c.benchmark_group("arrow_array_reader/FixedLenByteArray(16)");
+    let mandatory_flba16_leaf_desc = schema.column(25);
+    let optional_flba16_leaf_desc = schema.column(26);
+    bench_fixed_len_byte_array::<16>(
+        &mut group,
+        &mandatory_flba16_leaf_desc,
+        &optional_flba16_leaf_desc,
+    );
+    group.finish();
 }
 
-criterion_group!(
-    benches,
-    add_benches,
-    decimal_benches,
-    byte_stream_split_benches,
-);
+criterion_group!(benches, add_benches, decimal_benches, float16_benches,);
 criterion_main!(benches);
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 01692c242..4be07ed68 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -36,6 +36,7 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit};
 use bytes::Bytes;
 use half::f16;
 use std::any::Any;
+use std::ops::Range;
 use std::sync::Arc;
 
 /// Returns an [`ArrayReader`] that decodes the provided fixed length byte 
array column
@@ -233,6 +234,29 @@ struct FixedLenByteArrayBuffer {
     byte_length: Option<usize>,
 }
 
+#[inline]
+fn move_values<F>(
+    buffer: &mut Vec<u8>,
+    byte_length: usize,
+    values_range: Range<usize>,
+    valid_mask: &[u8],
+    mut op: F,
+) where
+    F: FnMut(&mut Vec<u8>, usize, usize, usize),
+{
+    for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
+        debug_assert!(level_pos >= value_pos);
+        if level_pos <= value_pos {
+            break;
+        }
+
+        let level_pos_bytes = level_pos * byte_length;
+        let value_pos_bytes = value_pos * byte_length;
+
+        op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
+    }
+}
+
 impl ValuesBuffer for FixedLenByteArrayBuffer {
     fn pad_nulls(
         &mut self,
@@ -248,18 +272,26 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
             .resize((read_offset + levels_read) * byte_length, 0);
 
         let values_range = read_offset..read_offset + values_read;
-        for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
-            debug_assert!(level_pos >= value_pos);
-            if level_pos <= value_pos {
-                break;
-            }
-
-            let level_pos_bytes = level_pos * byte_length;
-            let value_pos_bytes = value_pos * byte_length;
-
-            for i in 0..byte_length {
-                self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes 
+ i]
-            }
+        // Move the bytes from value_pos to level_pos. For values of 
`byte_length` <= 4,
+        // the simple loop is preferred as the compiler can eliminate the loop 
via unrolling.
+        // For `byte_length > 4`, we instead copy from non-overlapping slices. 
This allows
+        // the loop to be vectorized, yielding much better performance.
+        const VEC_CUTOFF: usize = 4;
+        if byte_length > VEC_CUTOFF {
+            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, 
byte_length| {
+                let split = buffer.split_at_mut(level_pos_bytes);
+                let dst = &mut split.1[..byte_length];
+                let src = &split.0[value_pos_bytes..value_pos_bytes + 
byte_length];
+                dst.copy_from_slice(src);
+            };
+            move_values(&mut self.buffer, byte_length, values_range, 
valid_mask, op);
+        } else {
+            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, 
byte_length| {
+                for i in 0..byte_length {
+                    buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
+                }
+            };
+            move_values(&mut self.buffer, byte_length, values_range, 
valid_mask, op);
         }
     }
 }

Reply via email to