This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 04f67908f feat: support 'Decimal256' for parquet (#4272)
04f67908f is described below
commit 04f67908fb1f85eb2dbcd129b2d435d189cd658d
Author: Alex Huang <[email protected]>
AuthorDate: Tue May 30 13:15:02 2023 +0200
feat: support 'Decimal256' for parquet (#4272)
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
parquet/src/arrow/array_reader/byte_array.rs | 24 +++++++++--
.../src/arrow/array_reader/fixed_len_byte_array.rs | 32 +++++++++++----
parquet/src/arrow/array_reader/primitive_array.rs | 31 ++++++++++++++-
parquet/src/arrow/arrow_reader/mod.rs | 6 ++-
parquet/src/arrow/arrow_writer/levels.rs | 1 +
parquet/src/arrow/arrow_writer/mod.rs | 44 ++++++++++++++++++---
parquet/src/arrow/schema/mod.rs | 6 +--
parquet/src/arrow/schema/primitive.rs | 46 +++++++++++++++++-----
8 files changed, 158 insertions(+), 32 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 22fa0ab45..43db658d9 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -30,8 +30,10 @@ use crate::encodings::decoding::{Decoder,
DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
-use arrow_array::{Array, ArrayRef, BinaryArray, Decimal128Array,
OffsetSizeTrait};
-use arrow_buffer::Buffer;
+use arrow_array::{
+ Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array,
OffsetSizeTrait,
+};
+use arrow_buffer::{i256, Buffer};
use arrow_schema::DataType as ArrowType;
use std::any::Any;
use std::ops::Range;
@@ -52,7 +54,10 @@ pub fn make_byte_array_reader(
};
match data_type {
- ArrowType::Binary | ArrowType::Utf8 | ArrowType::Decimal128(_, _) => {
+ ArrowType::Binary
+ | ArrowType::Utf8
+ | ArrowType::Decimal128(_, _)
+ | ArrowType::Decimal256(_, _) => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
@@ -119,7 +124,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
- let array = match self.data_type {
+ let array: ArrayRef = match self.data_type {
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary =
array.as_any().downcast_ref::<BinaryArray>().unwrap();
@@ -131,6 +136,17 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
Arc::new(decimal)
}
+ ArrowType::Decimal256(p, s) => {
+ let array = buffer.into_array(null_buffer, ArrowType::Binary);
+ let binary =
array.as_any().downcast_ref::<BinaryArray>().unwrap();
+ let decimal = binary
+ .iter()
+ .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
+ .collect::<Decimal256Array>()
+ .with_precision_and_scale(p, s)?;
+
+ Arc::new(decimal)
+ }
_ => buffer.into_array(null_buffer, self.data_type.clone()),
};
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index fee032a4d..47bd03a73 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -28,10 +28,10 @@ use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow_array::{
- ArrayRef, Decimal128Array, FixedSizeBinaryArray, IntervalDayTimeArray,
- IntervalYearMonthArray,
+ ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray,
+ IntervalDayTimeArray, IntervalYearMonthArray,
};
-use arrow_buffer::Buffer;
+use arrow_buffer::{i256, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, IntervalUnit};
use std::any::Any;
@@ -61,7 +61,6 @@ pub fn make_fixed_len_byte_array_reader(
))
}
};
-
match &data_type {
ArrowType::FixedSizeBinary(_) => {}
ArrowType::Decimal128(_, _) => {
@@ -72,6 +71,14 @@ pub fn make_fixed_len_byte_array_reader(
));
}
}
+ ArrowType::Decimal256(_, _) => {
+ if byte_length > 32 {
+ return Err(general_err!(
+ "decimal 256 type too large, must be less than 32 bytes,
got {}",
+ byte_length
+ ));
+ }
+ }
ArrowType::Interval(_) => {
if byte_length != 12 {
//
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
@@ -155,7 +162,7 @@ impl ArrayReader for FixedLenByteArrayReader {
let binary = FixedSizeBinaryArray::from(unsafe {
array_data.build_unchecked() });
// TODO: An improvement might be to do this conversion on read
- let array = match &self.data_type {
+ let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
let decimal = binary
.iter()
@@ -165,6 +172,15 @@ impl ArrayReader for FixedLenByteArrayReader {
Arc::new(decimal)
}
+ ArrowType::Decimal256(p, s) => {
+ let decimal = binary
+ .iter()
+ .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
+ .collect::<Decimal256Array>()
+ .with_precision_and_scale(*p, *s)?;
+
+ Arc::new(decimal)
+ }
ArrowType::Interval(unit) => {
// An interval is stored as 3x 32-bit unsigned integers
storing months, days,
// and milliseconds
@@ -428,16 +444,18 @@ mod tests {
use super::*;
use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::ArrowWriter;
- use arrow_array::{Array, Decimal128Array, ListArray};
use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use arrow_array::RecordBatch;
+ use arrow_array::{Array, ListArray};
use bytes::Bytes;
use std::sync::Arc;
#[test]
fn test_decimal_list() {
- let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7,
8]);
+ let decimals = Decimal256Array::from_iter_values(
+ [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
+ );
// [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new(
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 772026960..1e2720a4a 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -24,12 +24,13 @@ use crate::column::page::PageIterator;
use crate::data_type::{DataType, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
+use arrow_array::Decimal256Array;
use arrow_array::{
builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder},
ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array,
Int32Array,
Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array,
};
-use arrow_buffer::Buffer;
+use arrow_buffer::{i256, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, TimeUnit};
use std::any::Any;
@@ -237,6 +238,34 @@ where
Arc::new(array) as ArrayRef
}
+ ArrowType::Decimal256(p, s) => {
+ let array = match array.data_type() {
+ ArrowType::Int32 => array
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .iter()
+ .map(|v| v.map(|v| i256::from_i128(v as i128)))
+ .collect::<Decimal256Array>(),
+
+ ArrowType::Int64 => array
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .iter()
+ .map(|v| v.map(|v| i256::from_i128(v as i128)))
+ .collect::<Decimal256Array>(),
+ _ => {
+ return Err(arrow_err!(
+ "Cannot convert {:?} to decimal",
+ array.data_type()
+ ));
+ }
+ }
+ .with_precision_and_scale(*p, *s)?;
+
+ Arc::new(array) as ArrayRef
+ }
_ => arrow_cast::cast(&array, target_type)?,
};
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 4b14a54c5..819e96c0a 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -528,7 +528,7 @@ mod tests {
use arrow_array::builder::*;
use arrow_array::*;
use arrow_array::{RecordBatch, RecordBatchReader};
- use arrow_buffer::Buffer;
+ use arrow_buffer::{i256, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema};
@@ -928,7 +928,9 @@ mod tests {
#[test]
fn test_decimal_nullable_struct() {
- let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7,
8]);
+ let decimals = Decimal256Array::from_iter_values(
+ [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
+ );
let data =
ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index 21b3e7dff..47b018903 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -88,6 +88,7 @@ fn is_leaf(data_type: &DataType) -> bool {
| DataType::Binary
| DataType::LargeBinary
| DataType::Decimal128(_, _)
+ | DataType::Decimal256(_, _)
| DataType::FixedSizeBinary(_)
)
}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index bde21ae85..5f2750a55 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -27,10 +27,7 @@ use std::vec::IntoIter;
use thrift::protocol::{TCompactOutputProtocol, TSerializable};
use arrow_array::cast::AsArray;
-use arrow_array::types::{
- Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type,
- UInt64Type,
-};
+use arrow_array::types::*;
use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit,
SchemaRef};
@@ -551,6 +548,13 @@ fn write_leaf(
.unary::<_, Int32Type>(|v| v as i32);
write_primitive(typed, array.values(), levels)
}
+ ArrowDataType::Decimal256(_, _) => {
+ // use the int32 to represent the decimal with low
precision
+ let array = column
+ .as_primitive::<Decimal256Type>()
+ .unary::<_, Int32Type>(|v| v.as_i128() as i32);
+ write_primitive(typed, array.values(), levels)
+ }
_ => {
let array = arrow_cast::cast(column,
&ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
@@ -586,6 +590,13 @@ fn write_leaf(
.unary::<_, Int64Type>(|v| v as i64);
write_primitive(typed, array.values(), levels)
}
+ ArrowDataType::Decimal256(_, _) => {
+ // use the int64 to represent the decimal with low
precision
+ let array = column
+ .as_primitive::<Decimal256Type>()
+ .unary::<_, Int64Type>(|v| v.as_i128() as i64);
+ write_primitive(typed, array.values(), levels)
+ }
_ => {
let array = arrow_cast::cast(column,
&ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
@@ -641,7 +652,14 @@ fn write_leaf(
}
ArrowDataType::Decimal128(_, _) => {
let array = column.as_primitive::<Decimal128Type>();
- get_decimal_array_slice(array, indices)
+ get_decimal_128_array_slice(array, indices)
+ }
+ ArrowDataType::Decimal256(_, _) => {
+ let array = column
+ .as_any()
+ .downcast_ref::<arrow_array::Decimal256Array>()
+ .unwrap();
+ get_decimal_256_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
@@ -715,7 +733,7 @@ fn get_interval_dt_array_slice(
values
}
-fn get_decimal_array_slice(
+fn get_decimal_128_array_slice(
array: &arrow_array::Decimal128Array,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
@@ -729,6 +747,20 @@ fn get_decimal_array_slice(
values
}
+fn get_decimal_256_array_slice(
+ array: &arrow_array::Decimal256Array,
+ indices: &[usize],
+) -> Vec<FixedLenByteArray> {
+ let mut values = Vec::with_capacity(indices.len());
+ let size = decimal_length_from_precision(array.precision());
+ for i in indices {
+ let as_be_bytes = array.value(*i).to_be_bytes();
+ let resized_value = as_be_bytes[(32 - size)..].to_vec();
+ values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
+ }
+ values
+}
+
fn get_fsb_array_slice(
array: &arrow_array::FixedSizeBinaryArray,
indices: &[usize],
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 399dcba9e..ffae1eae5 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -589,7 +589,7 @@ mod tests {
let arrow_fields = Fields::from(vec![
Field::new("decimal1", DataType::Decimal128(4, 2), false),
Field::new("decimal2", DataType::Decimal128(12, 2), false),
- Field::new("decimal3", DataType::Decimal128(30, 2), false),
+ Field::new("decimal3", DataType::Decimal256(30, 2), false),
Field::new("decimal4", DataType::Decimal128(33, 2), false),
]);
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
@@ -1443,7 +1443,7 @@ mod tests {
),
Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
- Field::new("decimal_fix_length", DataType::Decimal128(30, 2),
false),
+ Field::new("decimal_fix_length", DataType::Decimal256(30, 2),
false),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema =
arrow_to_parquet_schema(&arrow_schema).unwrap();
@@ -1614,7 +1614,7 @@ mod tests {
// ),
Field::new("c35", DataType::Null, true),
Field::new("c36", DataType::Decimal128(2, 1), false),
- Field::new("c37", DataType::Decimal128(50, 20), false),
+ Field::new("c37", DataType::Decimal256(50, 20), false),
Field::new("c38", DataType::Decimal128(18, 12), true),
Field::new_map(
"c39",
diff --git a/parquet/src/arrow/schema/primitive.rs
b/parquet/src/arrow/schema/primitive.rs
index 6565f7eae..d4db28915 100644
--- a/parquet/src/arrow/schema/primitive.rs
+++ b/parquet/src/arrow/schema/primitive.rs
@@ -103,7 +103,7 @@ fn from_parquet(parquet_type: &Type) -> Result<DataType> {
}
}
-fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
+fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
let scale = scale
.try_into()
.map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
@@ -115,6 +115,18 @@ fn decimal_type(scale: i32, precision: i32) ->
Result<DataType> {
Ok(DataType::Decimal128(precision, scale))
}
+fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
+ let scale = scale
+ .try_into()
+ .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
+
+ let precision = precision
+ .try_into()
+ .map_err(|_| arrow_err!("precision cannot be negative: {}",
precision))?;
+
+ Ok(DataType::Decimal256(precision, scale))
+}
+
fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) ->
Result<DataType> {
match (info.logical_type(), info.converted_type()) {
(None, ConvertedType::NONE) => Ok(DataType::Int32),
@@ -136,7 +148,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision:
i32) -> Result<DataTy
_ => Err(arrow_err!("Cannot create INT32 physical type from {:?}",
t)),
},
(Some(LogicalType::Decimal { scale, precision }), _) => {
- decimal_type(scale, precision)
+ decimal_128_type(scale, precision)
}
(Some(LogicalType::Date), _) => Ok(DataType::Date32),
(Some(LogicalType::Time { unit, .. }), _) => match unit {
@@ -156,7 +168,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision:
i32) -> Result<DataTy
(None, ConvertedType::INT_32) => Ok(DataType::Int32),
(None, ConvertedType::DATE) => Ok(DataType::Date32),
(None, ConvertedType::TIME_MILLIS) =>
Ok(DataType::Time32(TimeUnit::Millisecond)),
- (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
(logical, converted) => Err(arrow_err!(
"Unable to convert parquet INT32 logical type {:?} or converted
type {}",
logical,
@@ -213,9 +225,9 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision:
i32) -> Result<DataTy
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
(Some(LogicalType::Decimal { scale, precision }), _) => {
- decimal_type(scale, precision)
+ decimal_128_type(scale, precision)
}
- (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
(logical, converted) => Err(arrow_err!(
"Unable to convert parquet INT64 logical type {:?} or converted
type {}",
logical,
@@ -235,8 +247,14 @@ fn from_byte_array(info: &BasicTypeInfo, precision: i32,
scale: i32) -> Result<D
(None, ConvertedType::BSON) => Ok(DataType::Binary),
(None, ConvertedType::ENUM) => Ok(DataType::Binary),
(None, ConvertedType::UTF8) => Ok(DataType::Utf8),
- (Some(LogicalType::Decimal { scale: s, precision: p }), _) =>
decimal_type(s, p),
- (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
+ (
+ Some(LogicalType::Decimal {
+ scale: s,
+ precision: p,
+ }),
+ _,
+ ) => decimal_128_type(s, p),
+ (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
(logical, converted) => Err(arrow_err!(
"Unable to convert parquet BYTE_ARRAY logical type {:?} or
converted type {}",
logical,
@@ -254,9 +272,19 @@ fn from_fixed_len_byte_array(
// TODO: This should check the type length for the decimal and interval
types
match (info.logical_type(), info.converted_type()) {
(Some(LogicalType::Decimal { scale, precision }), _) => {
- decimal_type(scale, precision)
+ if type_length < 16 {
+ decimal_128_type(scale, precision)
+ } else {
+ decimal_256_type(scale, precision)
+ }
+ }
+ (None, ConvertedType::DECIMAL) => {
+ if type_length < 16 {
+ decimal_128_type(scale, precision)
+ } else {
+ decimal_256_type(scale, precision)
+ }
}
- (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
(None, ConvertedType::INTERVAL) => {
// There is currently no reliable way of determining which
IntervalUnit
// to return. Thus without the original Arrow schema, the results