This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd4a5f59 Use `unary()` for array conversion in Parquet array readers, 
speed up `Decimal128`, `Decimal256` and `Float16`  (#6252)
6dd4a5f59 is described below

commit 6dd4a5f59090803d5c54343fadff4d2eb1fff813
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Aug 22 14:55:48 2024 -0700

    Use `unary()` for array conversion in Parquet array readers, speed up 
`Decimal128`, `Decimal256` and `Float16`  (#6252)
    
    * add unary to FixedSizeBinaryArray; use unary for transformations
    
    * clean up documentation some
    
    * add from_unary to PrimitiveArray
    
    * use from_unary for converting byte array to decimal
    
    * rework from_unary to skip vector initialization
    
    * add example to from_unary docstring
    
    * fix broken link
    
    * add comments per review suggestion
---
 arrow-array/src/array/primitive_array.rs           | 30 ++++++++
 parquet/src/arrow/array_reader/byte_array.rs       | 29 ++++----
 .../src/arrow/array_reader/fixed_len_byte_array.rs | 64 ++++++-----------
 parquet/src/arrow/array_reader/primitive_array.rs  | 84 ++++++++--------------
 4 files changed, 95 insertions(+), 112 deletions(-)

diff --git a/arrow-array/src/array/primitive_array.rs 
b/arrow-array/src/array/primitive_array.rs
index db14845b0..521ef088e 100644
--- a/arrow-array/src/array/primitive_array.rs
+++ b/arrow-array/src/array/primitive_array.rs
@@ -1016,6 +1016,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
         PrimitiveArray::new(values, Some(nulls))
     }
 
+    /// Applies a unary infallible function to each value in an array, 
producing a
+    /// new primitive array.
+    ///
+    /// # Null Handling
+    ///
+    /// See [`Self::unary`] for more information on null handling.
+    ///
+    /// # Example: create an [`Int16Array`] from an [`ArrayAccessor`] with 
item type `&[u8]`
+    /// ```
+    /// use arrow_array::{Array, FixedSizeBinaryArray, Int16Array};
+    /// let input_arg = vec![ vec![1, 0], vec![2, 0], vec![3, 0] ];
+    /// let arr = 
FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap();
+    /// let c = Int16Array::from_unary(&arr, |x| 
i16::from_le_bytes(x[..2].try_into().unwrap()));
+    /// assert_eq!(c, Int16Array::from(vec![Some(1i16), Some(2i16), 
Some(3i16)]));
+    /// ```
+    pub fn from_unary<U: ArrayAccessor, F>(left: U, mut op: F) -> Self
+    where
+        F: FnMut(U::Item) -> T::Native,
+    {
+        let nulls = left.logical_nulls();
+        let buffer = unsafe {
+            // SAFETY: i in range 0..left.len()
+            let iter = (0..left.len()).map(|i| op(left.value_unchecked(i)));
+            // SAFETY: upper bound is trusted because `iter` is over a range
+            Buffer::from_trusted_len_iter(iter)
+        };
+
+        PrimitiveArray::new(buffer.into(), nulls)
+    }
+
     /// Returns a `PrimitiveBuilder` for this array, suitable for mutating 
values
     /// in place.
     ///
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index ed5961586..925831556 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -120,26 +120,31 @@ impl<I: OffsetSizeTrait> ArrayReader for 
ByteArrayReader<I> {
         self.record_reader.reset();
 
         let array: ArrayRef = match self.data_type {
+            // Apply conversion to all elements regardless of null slots as 
the conversions
+            // are infallible. This improves performance by avoiding a branch 
in the inner
+            // loop (see docs for `PrimitiveArray::from_unary`).
             ArrowType::Decimal128(p, s) => {
                 let array = buffer.into_array(null_buffer, ArrowType::Binary);
                 let binary = 
array.as_any().downcast_ref::<BinaryArray>().unwrap();
-                let decimal = binary
-                    .iter()
-                    .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
-                    .collect::<Decimal128Array>()
-                    .with_precision_and_scale(p, s)?;
-
+                // Null slots will have 0 length, so we need to check for that 
in the lambda
+                // or sign_extend_be will panic.
+                let decimal = Decimal128Array::from_unary(binary, |x| match 
x.len() {
+                    0 => i128::default(),
+                    _ => i128::from_be_bytes(sign_extend_be(x)),
+                })
+                .with_precision_and_scale(p, s)?;
                 Arc::new(decimal)
             }
             ArrowType::Decimal256(p, s) => {
                 let array = buffer.into_array(null_buffer, ArrowType::Binary);
                 let binary = 
array.as_any().downcast_ref::<BinaryArray>().unwrap();
-                let decimal = binary
-                    .iter()
-                    .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
-                    .collect::<Decimal256Array>()
-                    .with_precision_and_scale(p, s)?;
-
+                // Null slots will have 0 length, so we need to check for that 
in the lambda
+                // or sign_extend_be will panic.
+                let decimal = Decimal256Array::from_unary(binary, |x| match 
x.len() {
+                    0 => i256::default(),
+                    _ => i256::from_be_bytes(sign_extend_be(x)),
+                })
+                .with_precision_and_scale(p, s)?;
                 Arc::new(decimal)
             }
             _ => buffer.into_array(null_buffer, self.data_type.clone()),
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 3b2600c54..01692c242 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::{
-    Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, 
Float16Array,
+    ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, 
Float16Array,
     IntervalDayTimeArray, IntervalYearMonthArray,
 };
 use arrow_buffer::{i256, Buffer, IntervalDayTime};
@@ -163,55 +163,36 @@ impl ArrayReader for FixedLenByteArrayReader {
         let binary = FixedSizeBinaryArray::from(unsafe { 
array_data.build_unchecked() });
 
         // TODO: An improvement might be to do this conversion on read
+        // Note the conversions below apply to all elements regardless of null 
slots as the
+        // conversion lambdas are all infallible. This improves performance by 
avoiding a branch in
+        // the inner loop (see docs for `PrimitiveArray::from_unary`).
         let array: ArrayRef = match &self.data_type {
             ArrowType::Decimal128(p, s) => {
-                // We can simply reuse the null buffer from `binary` rather 
than recomputing it
-                // (as was the case when we simply used `collect` to produce 
the new array).
-                // The same applies to the transformations below.
-                let nulls = binary.nulls().cloned();
-                let decimal = binary.iter().map(|o| match o {
-                    Some(b) => i128::from_be_bytes(sign_extend_be(b)),
-                    None => i128::default(),
-                });
-                let decimal = 
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
-                    .with_precision_and_scale(*p, *s)?;
-                Arc::new(decimal)
+                let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
+                Arc::new(Decimal128Array::from_unary(&binary, 
f).with_precision_and_scale(*p, *s)?)
+                    as ArrayRef
             }
             ArrowType::Decimal256(p, s) => {
-                let nulls = binary.nulls().cloned();
-                let decimal = binary.iter().map(|o| match o {
-                    Some(b) => i256::from_be_bytes(sign_extend_be(b)),
-                    None => i256::default(),
-                });
-                let decimal = 
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
-                    .with_precision_and_scale(*p, *s)?;
-                Arc::new(decimal)
+                let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
+                Arc::new(Decimal256Array::from_unary(&binary, 
f).with_precision_and_scale(*p, *s)?)
+                    as ArrayRef
             }
             ArrowType::Interval(unit) => {
-                let nulls = binary.nulls().cloned();
                 // An interval is stored as 3x 32-bit unsigned integers 
storing months, days,
                 // and milliseconds
                 match unit {
                     IntervalUnit::YearMonth => {
-                        let iter = binary.iter().map(|o| match o {
-                            Some(b) => 
i32::from_le_bytes(b[0..4].try_into().unwrap()),
-                            None => i32::default(),
-                        });
-                        let interval =
-                            
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
-                        Arc::new(interval) as ArrayRef
+                        let f = |b: &[u8]| 
i32::from_le_bytes(b[0..4].try_into().unwrap());
+                        Arc::new(IntervalYearMonthArray::from_unary(&binary, 
f)) as ArrayRef
                     }
                     IntervalUnit::DayTime => {
-                        let iter = binary.iter().map(|o| match o {
-                            Some(b) => IntervalDayTime::new(
+                        let f = |b: &[u8]| {
+                            IntervalDayTime::new(
                                 
i32::from_le_bytes(b[4..8].try_into().unwrap()),
                                 
i32::from_le_bytes(b[8..12].try_into().unwrap()),
-                            ),
-                            None => IntervalDayTime::default(),
-                        });
-                        let interval =
-                            
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
-                        Arc::new(interval) as ArrayRef
+                            )
+                        };
+                        Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) 
as ArrayRef
                     }
                     IntervalUnit::MonthDayNano => {
                         return Err(nyi_err!("MonthDayNano intervals not 
supported"));
@@ -219,13 +200,8 @@ impl ArrayReader for FixedLenByteArrayReader {
                 }
             }
             ArrowType::Float16 => {
-                let nulls = binary.nulls().cloned();
-                let f16s = binary.iter().map(|o| match o {
-                    Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
-                    None => f16::default(),
-                });
-                let f16s = Float16Array::from_iter_values_with_nulls(f16s, 
nulls);
-                Arc::new(f16s) as ArrayRef
+                let f = |b: &[u8]| 
f16::from_le_bytes(b[..2].try_into().unwrap());
+                Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
             }
             _ => Arc::new(binary) as ArrayRef,
         };
@@ -488,8 +464,8 @@ mod tests {
     use crate::arrow::ArrowWriter;
     use arrow::datatypes::Field;
     use arrow::error::Result as ArrowResult;
-    use arrow_array::RecordBatch;
     use arrow_array::{Array, ListArray};
+    use arrow_array::{Decimal256Array, RecordBatch};
     use bytes::Bytes;
     use std::sync::Arc;
 
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 5e0e09212..010e9c2ee 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -217,35 +217,22 @@ where
                 arrow_cast::cast(&a, target_type)?
             }
             ArrowType::Decimal128(p, s) => {
-                // We can simply reuse the null buffer from `array` rather 
than recomputing it
-                // (as was the case when we simply used `collect` to produce 
the new array).
-                let nulls = array.nulls().cloned();
+                // Apply conversion to all elements regardless of null slots 
as the conversion
+                // to `i128` is infallible. This improves performance by 
avoiding a branch in
+                // the inner loop (see docs for `PrimitiveArray::unary`).
                 let array = match array.data_type() {
-                    ArrowType::Int32 => {
-                        let decimal = array
-                            .as_any()
-                            .downcast_ref::<Int32Array>()
-                            .unwrap()
-                            .iter()
-                            .map(|v| match v {
-                                Some(i) => i as i128,
-                                None => i128::default(),
-                            });
-                        Decimal128Array::from_iter_values_with_nulls(decimal, 
nulls)
-                    }
-
-                    ArrowType::Int64 => {
-                        let decimal = array
-                            .as_any()
-                            .downcast_ref::<Int64Array>()
-                            .unwrap()
-                            .iter()
-                            .map(|v| match v {
-                                Some(i) => i as i128,
-                                None => i128::default(),
-                            });
-                        Decimal128Array::from_iter_values_with_nulls(decimal, 
nulls)
-                    }
+                    ArrowType::Int32 => array
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .unary(|i| i as i128)
+                        as Decimal128Array,
+                    ArrowType::Int64 => array
+                        .as_any()
+                        .downcast_ref::<Int64Array>()
+                        .unwrap()
+                        .unary(|i| i as i128)
+                        as Decimal128Array,
                     _ => {
                         return Err(arrow_err!(
                             "Cannot convert {:?} to decimal",
@@ -258,35 +245,20 @@ where
                 Arc::new(array) as ArrayRef
             }
             ArrowType::Decimal256(p, s) => {
-                // We can simply reuse the null buffer from `array` rather 
than recomputing it
-                // (as was the case when we simply used `collect` to produce 
the new array).
-                let nulls = array.nulls().cloned();
+                // See above comment. Conversion to `i256` is likewise 
infallible.
                 let array = match array.data_type() {
-                    ArrowType::Int32 => {
-                        let decimal = array
-                            .as_any()
-                            .downcast_ref::<Int32Array>()
-                            .unwrap()
-                            .iter()
-                            .map(|v| match v {
-                                Some(i) => i256::from_i128(i as i128),
-                                None => i256::default(),
-                            });
-                        Decimal256Array::from_iter_values_with_nulls(decimal, 
nulls)
-                    }
-
-                    ArrowType::Int64 => {
-                        let decimal = array
-                            .as_any()
-                            .downcast_ref::<Int64Array>()
-                            .unwrap()
-                            .iter()
-                            .map(|v| match v {
-                                Some(i) => i256::from_i128(i as i128),
-                                None => i256::default(),
-                            });
-                        Decimal256Array::from_iter_values_with_nulls(decimal, 
nulls)
-                    }
+                    ArrowType::Int32 => array
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .unary(|i| i256::from_i128(i as i128))
+                        as Decimal256Array,
+                    ArrowType::Int64 => array
+                        .as_any()
+                        .downcast_ref::<Int64Array>()
+                        .unwrap()
+                        .unary(|i| i256::from_i128(i as i128))
+                        as Decimal256Array,
                     _ => {
                         return Err(arrow_err!(
                             "Cannot convert {:?} to decimal",

Reply via email to