This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f67908f feat: support 'Decimal256' for parquet (#4272)
04f67908f is described below

commit 04f67908fb1f85eb2dbcd129b2d435d189cd658d
Author: Alex Huang <[email protected]>
AuthorDate: Tue May 30 13:15:02 2023 +0200

    feat: support 'Decimal256' for parquet (#4272)
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 parquet/src/arrow/array_reader/byte_array.rs       | 24 +++++++++--
 .../src/arrow/array_reader/fixed_len_byte_array.rs | 32 +++++++++++----
 parquet/src/arrow/array_reader/primitive_array.rs  | 31 ++++++++++++++-
 parquet/src/arrow/arrow_reader/mod.rs              |  6 ++-
 parquet/src/arrow/arrow_writer/levels.rs           |  1 +
 parquet/src/arrow/arrow_writer/mod.rs              | 44 ++++++++++++++++++---
 parquet/src/arrow/schema/mod.rs                    |  6 +--
 parquet/src/arrow/schema/primitive.rs              | 46 +++++++++++++++++-----
 8 files changed, 158 insertions(+), 32 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 22fa0ab45..43db658d9 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -30,8 +30,10 @@ use crate::encodings::decoding::{Decoder, 
DeltaBitPackDecoder};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
-use arrow_array::{Array, ArrayRef, BinaryArray, Decimal128Array, 
OffsetSizeTrait};
-use arrow_buffer::Buffer;
+use arrow_array::{
+    Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, 
OffsetSizeTrait,
+};
+use arrow_buffer::{i256, Buffer};
 use arrow_schema::DataType as ArrowType;
 use std::any::Any;
 use std::ops::Range;
@@ -52,7 +54,10 @@ pub fn make_byte_array_reader(
     };
 
     match data_type {
-        ArrowType::Binary | ArrowType::Utf8 | ArrowType::Decimal128(_, _) => {
+        ArrowType::Binary
+        | ArrowType::Utf8
+        | ArrowType::Decimal128(_, _)
+        | ArrowType::Decimal256(_, _) => {
             let reader = GenericRecordReader::new(column_desc);
             Ok(Box::new(ByteArrayReader::<i32>::new(
                 pages, data_type, reader,
@@ -119,7 +124,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
         self.rep_levels_buffer = self.record_reader.consume_rep_levels();
         self.record_reader.reset();
 
-        let array = match self.data_type {
+        let array: ArrayRef = match self.data_type {
             ArrowType::Decimal128(p, s) => {
                 let array = buffer.into_array(null_buffer, ArrowType::Binary);
                 let binary = 
array.as_any().downcast_ref::<BinaryArray>().unwrap();
@@ -131,6 +136,17 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
 
                 Arc::new(decimal)
             }
+            ArrowType::Decimal256(p, s) => {
+                let array = buffer.into_array(null_buffer, ArrowType::Binary);
+                let binary = 
array.as_any().downcast_ref::<BinaryArray>().unwrap();
+                let decimal = binary
+                    .iter()
+                    .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(p, s)?;
+
+                Arc::new(decimal)
+            }
             _ => buffer.into_array(null_buffer, self.data_type.clone()),
         };
 
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index fee032a4d..47bd03a73 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -28,10 +28,10 @@ use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
 use arrow_array::{
-    ArrayRef, Decimal128Array, FixedSizeBinaryArray, IntervalDayTimeArray,
-    IntervalYearMonthArray,
+    ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray,
+    IntervalDayTimeArray, IntervalYearMonthArray,
 };
-use arrow_buffer::Buffer;
+use arrow_buffer::{i256, Buffer};
 use arrow_data::ArrayDataBuilder;
 use arrow_schema::{DataType as ArrowType, IntervalUnit};
 use std::any::Any;
@@ -61,7 +61,6 @@ pub fn make_fixed_len_byte_array_reader(
             ))
         }
     };
-
     match &data_type {
         ArrowType::FixedSizeBinary(_) => {}
         ArrowType::Decimal128(_, _) => {
@@ -72,6 +71,14 @@ pub fn make_fixed_len_byte_array_reader(
                 ));
             }
         }
+        ArrowType::Decimal256(_, _) => {
+            if byte_length > 32 {
+                return Err(general_err!(
+                    "decimal 256 type too large, must be less than 32 bytes, 
got {}",
+                    byte_length
+                ));
+            }
+        }
         ArrowType::Interval(_) => {
             if byte_length != 12 {
                 // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
@@ -155,7 +162,7 @@ impl ArrayReader for FixedLenByteArrayReader {
         let binary = FixedSizeBinaryArray::from(unsafe { 
array_data.build_unchecked() });
 
         // TODO: An improvement might be to do this conversion on read
-        let array = match &self.data_type {
+        let array: ArrayRef = match &self.data_type {
             ArrowType::Decimal128(p, s) => {
                 let decimal = binary
                     .iter()
@@ -165,6 +172,15 @@ impl ArrayReader for FixedLenByteArrayReader {
 
                 Arc::new(decimal)
             }
+            ArrowType::Decimal256(p, s) => {
+                let decimal = binary
+                    .iter()
+                    .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(*p, *s)?;
+
+                Arc::new(decimal)
+            }
             ArrowType::Interval(unit) => {
                 // An interval is stored as 3x 32-bit unsigned integers 
storing months, days,
                 // and milliseconds
@@ -428,16 +444,18 @@ mod tests {
     use super::*;
     use crate::arrow::arrow_reader::ParquetRecordBatchReader;
     use crate::arrow::ArrowWriter;
-    use arrow_array::{Array, Decimal128Array, ListArray};
     use arrow::datatypes::Field;
     use arrow::error::Result as ArrowResult;
     use arrow_array::RecordBatch;
+    use arrow_array::{Array, ListArray};
     use bytes::Bytes;
     use std::sync::Arc;
 
     #[test]
     fn test_decimal_list() {
-        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 
8]);
+        let decimals = Decimal256Array::from_iter_values(
+            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
+        );
 
         // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
         let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new(
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 772026960..1e2720a4a 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -24,12 +24,13 @@ use crate::column::page::PageIterator;
 use crate::data_type::{DataType, Int96};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
+use arrow_array::Decimal256Array;
 use arrow_array::{
     builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder},
     ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, 
Int32Array,
     Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array,
 };
-use arrow_buffer::Buffer;
+use arrow_buffer::{i256, Buffer};
 use arrow_data::ArrayDataBuilder;
 use arrow_schema::{DataType as ArrowType, TimeUnit};
 use std::any::Any;
@@ -237,6 +238,34 @@ where
 
                 Arc::new(array) as ArrayRef
             }
+            ArrowType::Decimal256(p, s) => {
+                let array = match array.data_type() {
+                    ArrowType::Int32 => array
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| i256::from_i128(v as i128)))
+                        .collect::<Decimal256Array>(),
+
+                    ArrowType::Int64 => array
+                        .as_any()
+                        .downcast_ref::<Int64Array>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| i256::from_i128(v as i128)))
+                        .collect::<Decimal256Array>(),
+                    _ => {
+                        return Err(arrow_err!(
+                            "Cannot convert {:?} to decimal",
+                            array.data_type()
+                        ));
+                    }
+                }
+                .with_precision_and_scale(*p, *s)?;
+
+                Arc::new(array) as ArrayRef
+            }
             _ => arrow_cast::cast(&array, target_type)?,
         };
 
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 4b14a54c5..819e96c0a 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -528,7 +528,7 @@ mod tests {
     use arrow_array::builder::*;
     use arrow_array::*;
     use arrow_array::{RecordBatch, RecordBatchReader};
-    use arrow_buffer::Buffer;
+    use arrow_buffer::{i256, Buffer};
     use arrow_data::ArrayDataBuilder;
     use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema};
 
@@ -928,7 +928,9 @@ mod tests {
 
     #[test]
     fn test_decimal_nullable_struct() {
-        let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 
8]);
+        let decimals = Decimal256Array::from_iter_values(
+            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
+        );
 
         let data =
             
ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index 21b3e7dff..47b018903 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -88,6 +88,7 @@ fn is_leaf(data_type: &DataType) -> bool {
             | DataType::Binary
             | DataType::LargeBinary
             | DataType::Decimal128(_, _)
+            | DataType::Decimal256(_, _)
             | DataType::FixedSizeBinary(_)
     )
 }
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index bde21ae85..5f2750a55 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -27,10 +27,7 @@ use std::vec::IntoIter;
 use thrift::protocol::{TCompactOutputProtocol, TSerializable};
 
 use arrow_array::cast::AsArray;
-use arrow_array::types::{
-    Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type,
-    UInt64Type,
-};
+use arrow_array::types::*;
 use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
 use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, 
SchemaRef};
 
@@ -551,6 +548,13 @@ fn write_leaf(
                         .unary::<_, Int32Type>(|v| v as i32);
                     write_primitive(typed, array.values(), levels)
                 }
+                ArrowDataType::Decimal256(_, _) => {
+                    // use the int32 to represent the decimal with low 
precision
+                    let array = column
+                        .as_primitive::<Decimal256Type>()
+                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
+                    write_primitive(typed, array.values(), levels)
+                }
                 _ => {
                     let array = arrow_cast::cast(column, 
&ArrowDataType::Int32)?;
                     let array = array.as_primitive::<Int32Type>();
@@ -586,6 +590,13 @@ fn write_leaf(
                         .unary::<_, Int64Type>(|v| v as i64);
                     write_primitive(typed, array.values(), levels)
                 }
+                ArrowDataType::Decimal256(_, _) => {
+                    // use the int64 to represent the decimal with low 
precision
+                    let array = column
+                        .as_primitive::<Decimal256Type>()
+                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
+                    write_primitive(typed, array.values(), levels)
+                }
                 _ => {
                     let array = arrow_cast::cast(column, 
&ArrowDataType::Int64)?;
                     let array = array.as_primitive::<Int64Type>();
@@ -641,7 +652,14 @@ fn write_leaf(
                 }
                 ArrowDataType::Decimal128(_, _) => {
                     let array = column.as_primitive::<Decimal128Type>();
-                    get_decimal_array_slice(array, indices)
+                    get_decimal_128_array_slice(array, indices)
+                }
+                ArrowDataType::Decimal256(_, _) => {
+                    let array = column
+                        .as_any()
+                        .downcast_ref::<arrow_array::Decimal256Array>()
+                        .unwrap();
+                    get_decimal_256_array_slice(array, indices)
                 }
                 _ => {
                     return Err(ParquetError::NYI(
@@ -715,7 +733,7 @@ fn get_interval_dt_array_slice(
     values
 }
 
-fn get_decimal_array_slice(
+fn get_decimal_128_array_slice(
     array: &arrow_array::Decimal128Array,
     indices: &[usize],
 ) -> Vec<FixedLenByteArray> {
@@ -729,6 +747,20 @@ fn get_decimal_array_slice(
     values
 }
 
+fn get_decimal_256_array_slice(
+    array: &arrow_array::Decimal256Array,
+    indices: &[usize],
+) -> Vec<FixedLenByteArray> {
+    let mut values = Vec::with_capacity(indices.len());
+    let size = decimal_length_from_precision(array.precision());
+    for i in indices {
+        let as_be_bytes = array.value(*i).to_be_bytes();
+        let resized_value = as_be_bytes[(32 - size)..].to_vec();
+        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
+    }
+    values
+}
+
 fn get_fsb_array_slice(
     array: &arrow_array::FixedSizeBinaryArray,
     indices: &[usize],
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 399dcba9e..ffae1eae5 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -589,7 +589,7 @@ mod tests {
         let arrow_fields = Fields::from(vec![
             Field::new("decimal1", DataType::Decimal128(4, 2), false),
             Field::new("decimal2", DataType::Decimal128(12, 2), false),
-            Field::new("decimal3", DataType::Decimal128(30, 2), false),
+            Field::new("decimal3", DataType::Decimal256(30, 2), false),
             Field::new("decimal4", DataType::Decimal128(33, 2), false),
         ]);
         assert_eq!(&arrow_fields, converted_arrow_schema.fields());
@@ -1443,7 +1443,7 @@ mod tests {
             ),
             Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
             Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
-            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), 
false),
+            Field::new("decimal_fix_length", DataType::Decimal256(30, 2), 
false),
         ];
         let arrow_schema = Schema::new(arrow_fields);
         let converted_arrow_schema = 
arrow_to_parquet_schema(&arrow_schema).unwrap();
@@ -1614,7 +1614,7 @@ mod tests {
                 // ),
                 Field::new("c35", DataType::Null, true),
                 Field::new("c36", DataType::Decimal128(2, 1), false),
-                Field::new("c37", DataType::Decimal128(50, 20), false),
+                Field::new("c37", DataType::Decimal256(50, 20), false),
                 Field::new("c38", DataType::Decimal128(18, 12), true),
                 Field::new_map(
                     "c39",
diff --git a/parquet/src/arrow/schema/primitive.rs 
b/parquet/src/arrow/schema/primitive.rs
index 6565f7eae..d4db28915 100644
--- a/parquet/src/arrow/schema/primitive.rs
+++ b/parquet/src/arrow/schema/primitive.rs
@@ -103,7 +103,7 @@ fn from_parquet(parquet_type: &Type) -> Result<DataType> {
     }
 }
 
-fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
     let scale = scale
         .try_into()
         .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
@@ -115,6 +115,18 @@ fn decimal_type(scale: i32, precision: i32) -> 
Result<DataType> {
     Ok(DataType::Decimal128(precision, scale))
 }
 
+fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
+    let scale = scale
+        .try_into()
+        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+    let precision = precision
+        .try_into()
+        .map_err(|_| arrow_err!("precision cannot be negative: {}", 
precision))?;
+
+    Ok(DataType::Decimal256(precision, scale))
+}
+
 fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> 
Result<DataType> {
     match (info.logical_type(), info.converted_type()) {
         (None, ConvertedType::NONE) => Ok(DataType::Int32),
@@ -136,7 +148,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: 
i32) -> Result<DataTy
             _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", 
t)),
         },
         (Some(LogicalType::Decimal { scale, precision }), _) => {
-            decimal_type(scale, precision)
+            decimal_128_type(scale, precision)
         }
         (Some(LogicalType::Date), _) => Ok(DataType::Date32),
         (Some(LogicalType::Time { unit, .. }), _) => match unit {
@@ -156,7 +168,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: 
i32) -> Result<DataTy
         (None, ConvertedType::INT_32) => Ok(DataType::Int32),
         (None, ConvertedType::DATE) => Ok(DataType::Date32),
         (None, ConvertedType::TIME_MILLIS) => 
Ok(DataType::Time32(TimeUnit::Millisecond)),
-        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
         (logical, converted) => Err(arrow_err!(
             "Unable to convert parquet INT32 logical type {:?} or converted 
type {}",
             logical,
@@ -213,9 +225,9 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: 
i32) -> Result<DataTy
             Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
         }
         (Some(LogicalType::Decimal { scale, precision }), _) => {
-            decimal_type(scale, precision)
+            decimal_128_type(scale, precision)
         }
-        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
         (logical, converted) => Err(arrow_err!(
             "Unable to convert parquet INT64 logical type {:?} or converted 
type {}",
             logical,
@@ -235,8 +247,14 @@ fn from_byte_array(info: &BasicTypeInfo, precision: i32, 
scale: i32) -> Result<D
         (None, ConvertedType::BSON) => Ok(DataType::Binary),
         (None, ConvertedType::ENUM) => Ok(DataType::Binary),
         (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
-        (Some(LogicalType::Decimal { scale: s, precision: p }), _) => 
decimal_type(s, p),
-        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+        (
+            Some(LogicalType::Decimal {
+                scale: s,
+                precision: p,
+            }),
+            _,
+        ) => decimal_128_type(s, p),
+        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
         (logical, converted) => Err(arrow_err!(
             "Unable to convert parquet BYTE_ARRAY logical type {:?} or 
converted type {}",
             logical,
@@ -254,9 +272,19 @@ fn from_fixed_len_byte_array(
     // TODO: This should check the type length for the decimal and interval 
types
     match (info.logical_type(), info.converted_type()) {
         (Some(LogicalType::Decimal { scale, precision }), _) => {
-            decimal_type(scale, precision)
+            if type_length < 16 {
+                decimal_128_type(scale, precision)
+            } else {
+                decimal_256_type(scale, precision)
+            }
+        }
+        (None, ConvertedType::DECIMAL) => {
+            if type_length < 16 {
+                decimal_128_type(scale, precision)
+            } else {
+                decimal_256_type(scale, precision)
+            }
         }
-        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
         (None, ConvertedType::INTERVAL) => {
             // There is currently no reliable way of determining which 
IntervalUnit
             // to return. Thus without the original Arrow schema, the results

Reply via email to