alamb commented on code in PR #9077:
URL: https://github.com/apache/arrow-rs/pull/9077#discussion_r2662898772
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -504,6 +200,220 @@ where
}
}
+/// Coerce the parquet physical type array to the target type
+///
+/// This should match the logic in schema::primitive::apply_hint
+fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
+ if let ArrowType::Dictionary(key_type, value_type) = target_type {
+ let dictionary = pack_dictionary(key_type, array.as_ref())?;
Review Comment:
does this imply that we lose the ability to read DictionaryArrays directly
(without unpacking them first)? Or is this the fallback in case the data wasn't
actually dictionary encoded (e.g. it was encoded using `plain` encoding but the
user asked for Dictionary)?
##########
parquet/src/arrow/buffer/dictionary_buffer.rs:
##########
@@ -158,7 +165,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait>
DictionaryBuffer<K, V> {
unreachable!()
};
let values = if let ArrowType::FixedSizeBinary(size) =
**value_type {
- arrow_cast::cast(&values,
&ArrowType::FixedSizeBinary(size)).unwrap()
+ let binary = values.as_binary::<i32>();
Review Comment:
we could avoid these clones here too using the `into_builder` and make_array
stuff too
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -819,7 +821,15 @@ impl ArrowColumnWriter {
pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => {
- write_leaf(c, &col.0)?;
+ let leaf = col.0.array();
+ match leaf.as_any_dictionary_opt() {
+ Some(dictionary) => {
+ let materialized =
+ arrow_select::take::take(dictionary.values(),
dictionary.keys(), None)?;
Review Comment:
not related to this PR necessairly, but this seems to imply we are expanding
out previously encoded dictionary data, rather than reusing the dictionary. Is
that correct? I realize that we would hhave to handle the case when there are
different dictionaries across batches, but it seems like a potential
optimization worth considering
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -504,6 +200,220 @@ where
}
}
+/// Coerce the parquet physical type array to the target type
+///
+/// This should match the logic in schema::primitive::apply_hint
+fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
+ if let ArrowType::Dictionary(key_type, value_type) = target_type {
+ let dictionary = pack_dictionary(key_type, array.as_ref())?;
+ let any_dictionary = dictionary.as_any_dictionary();
+
+ let coerced_values =
+ coerce_array(Arc::clone(any_dictionary.values()),
value_type.as_ref())?;
+
+ return Ok(any_dictionary.with_values(coerced_values));
+ }
+
+ match array.data_type() {
+ ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
+ ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
+ ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 =>
Ok(array),
+ _ => unreachable!(),
+ }
+}
+
+fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef>
{
+ Ok(match target_type {
+ ArrowType::UInt8 => {
+ let array = array.unary(|i| i as u8) as UInt8Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int8 => {
+ let array = array.unary(|i| i as i8) as Int8Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::UInt16 => {
+ let array = array.unary(|i| i as u16) as UInt16Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int16 => {
+ let array = array.unary(|i| i as i16) as Int16Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int32 => Arc::new(array.clone()),
+ // follow C++ implementation and use overflow/reinterpret cast from
i32 to u32 which will map
+ // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
+ ArrowType::UInt32 => Arc::new(UInt32Array::new(
+ array.values().inner().clone().into(),
+ array.nulls().cloned(),
+ )) as ArrayRef,
+ ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>())
as _,
+ ArrowType::Date64 => {
+ let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Time32(TimeUnit::Second) => {
+ Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
+ }
+ ArrowType::Time32(TimeUnit::Millisecond) => {
+ Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as
ArrayRef
+ }
+ ArrowType::Timestamp(time_unit, timezone) => match time_unit {
+ TimeUnit::Second => {
+ let array: TimestampSecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Millisecond => {
+ let array: TimestampMillisecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Microsecond => {
+ let array: TimestampMicrosecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Nanosecond => {
+ let array: TimestampNanosecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ },
+ ArrowType::Decimal32(p, s) => {
+ let array = array
+ .reinterpret_cast::<Decimal32Type>()
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal64(p, s) => {
+ let array: Decimal64Array =
+ array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal128(p, s) => {
+ let array: Decimal128Array = array
+ .unary(|i| i as i128)
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal256(p, s) => {
+ let array: Decimal256Array = array
+ .unary(|i| i256::from_i128(i as i128))
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ _ => unreachable!("Cannot coerce i32 to {target_type}"),
+ })
+}
+
+fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef>
{
+ Ok(match target_type {
+ ArrowType::Int64 => Arc::new(array.clone()) as _,
+ // follow C++ implementation and use overflow/reinterpret cast from
i64 to u64 which will map
+ // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
+ ArrowType::UInt64 => Arc::new(UInt64Array::new(
+ array.values().inner().clone().into(),
Review Comment:
I double checked that this is a clone of the Buffer (so relatively simple)
I wonder if can avoid all these Arc new / clones / etc by passing in the
ArrayDataBuilder directly rather than a `&Int64Array`)
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -504,6 +200,220 @@ where
}
}
+/// Coerce the parquet physical type array to the target type
+///
+/// This should match the logic in schema::primitive::apply_hint
Review Comment:
Can you also please add the rationale why this doesn't use the cast kernel
(and instead reimplements some sort of it) as a comment? As I understand it,
the issue is that
1. We are trying to keep dependencies down
2. The semantics of casting certain types are different
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -218,271 +175,10 @@ where
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let array_data = unsafe { array_data.build_unchecked() };
- let array: ArrayRef = match T::get_physical_type() {
- PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
- PhysicalType::INT32 => match array_data.data_type() {
- ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
- ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
- ArrowType::Decimal32(_, _) =>
Arc::new(Decimal32Array::from(array_data)),
- _ => unreachable!(),
- },
- PhysicalType::INT64 => match array_data.data_type() {
- ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
- ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
- ArrowType::Decimal64(_, _) =>
Arc::new(Decimal64Array::from(array_data)),
- _ => unreachable!(),
- },
- PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
- PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
- PhysicalType::INT96 => match target_type {
- ArrowType::Timestamp(TimeUnit::Second, _) => {
- Arc::new(TimestampSecondArray::from(array_data))
- }
- ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
- Arc::new(TimestampMillisecondArray::from(array_data))
- }
- ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
- Arc::new(TimestampMicrosecondArray::from(array_data))
- }
- ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
- Arc::new(TimestampNanosecondArray::from(array_data))
- }
- _ => unreachable!("INT96 must be a timestamp."),
- },
+ let array: ArrayRef = make_array(array_data);
Review Comment:
this is probably one way your PR ends up removing net lines of code
##########
parquet/Cargo.toml:
##########
@@ -39,7 +39,6 @@ ahash = { version = "0.8", default-features = false, features
= ["runtime-rng"]
[dependencies]
arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
-arrow-cast = { workspace = true, optional = true }
Review Comment:
🎉
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -63,37 +62,23 @@ impl IntoBuffer for Vec<bool> {
impl IntoBuffer for Vec<Int96> {
fn into_buffer(self, target_type: &ArrowType) -> Buffer {
+ let mut builder = Vec::with_capacity(self.len());
Review Comment:
It probably doesn't matter but it might be faster to just `collect` the Vec
directly
```rust
let builder: Vec::<i64> = match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => {
self.iter().map(|x| x.to_seconds()).collect()
}
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
self.iter().map(|x| x.to_millis()).collect()
}
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
self.iter().map(|x| x.to_micros()).collect()
}
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
self.iter().map(|x| x.to_nanos()).collect()
}
_ => unreachable!("Invalid target_type for Int96."),
}
Buffer::from_vec(builder)
}
```
##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -504,6 +200,220 @@ where
}
}
+/// Coerce the parquet physical type array to the target type
+///
+/// This should match the logic in schema::primitive::apply_hint
+fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
+ if let ArrowType::Dictionary(key_type, value_type) = target_type {
+ let dictionary = pack_dictionary(key_type, array.as_ref())?;
+ let any_dictionary = dictionary.as_any_dictionary();
+
+ let coerced_values =
+ coerce_array(Arc::clone(any_dictionary.values()),
value_type.as_ref())?;
+
+ return Ok(any_dictionary.with_values(coerced_values));
+ }
+
+ match array.data_type() {
+ ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
+ ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
+ ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 =>
Ok(array),
+ _ => unreachable!(),
+ }
+}
+
+fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef>
{
+ Ok(match target_type {
+ ArrowType::UInt8 => {
+ let array = array.unary(|i| i as u8) as UInt8Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int8 => {
+ let array = array.unary(|i| i as i8) as Int8Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::UInt16 => {
+ let array = array.unary(|i| i as u16) as UInt16Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int16 => {
+ let array = array.unary(|i| i as i16) as Int16Array;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Int32 => Arc::new(array.clone()),
+ // follow C++ implementation and use overflow/reinterpret cast from
i32 to u32 which will map
+ // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
+ ArrowType::UInt32 => Arc::new(UInt32Array::new(
+ array.values().inner().clone().into(),
+ array.nulls().cloned(),
+ )) as ArrayRef,
+ ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>())
as _,
+ ArrowType::Date64 => {
+ let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Time32(TimeUnit::Second) => {
+ Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
+ }
+ ArrowType::Time32(TimeUnit::Millisecond) => {
+ Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as
ArrayRef
+ }
+ ArrowType::Timestamp(time_unit, timezone) => match time_unit {
+ TimeUnit::Second => {
+ let array: TimestampSecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Millisecond => {
+ let array: TimestampMillisecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Microsecond => {
+ let array: TimestampMicrosecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ TimeUnit::Nanosecond => {
+ let array: TimestampNanosecondArray = array
+ .unary(|x| x as i64)
+ .with_timezone_opt(timezone.clone());
+ Arc::new(array) as _
+ }
+ },
+ ArrowType::Decimal32(p, s) => {
+ let array = array
+ .reinterpret_cast::<Decimal32Type>()
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal64(p, s) => {
+ let array: Decimal64Array =
+ array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal128(p, s) => {
+ let array: Decimal128Array = array
+ .unary(|i| i as i128)
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ ArrowType::Decimal256(p, s) => {
+ let array: Decimal256Array = array
+ .unary(|i| i256::from_i128(i as i128))
+ .with_precision_and_scale(*p, *s)?;
+ Arc::new(array) as ArrayRef
+ }
+ _ => unreachable!("Cannot coerce i32 to {target_type}"),
+ })
+}
+
+fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef>
{
+ Ok(match target_type {
+ ArrowType::Int64 => Arc::new(array.clone()) as _,
+ // follow C++ implementation and use overflow/reinterpret cast from
i64 to u64 which will map
+ // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
+ ArrowType::UInt64 => Arc::new(UInt64Array::new(
+ array.values().inner().clone().into(),
Review Comment:
Something like
```rust
let builder = array.into_data().into_builder();
...
builder = builder.data_type(DurationSecondType);
...
make_array(builder.build()?)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]