This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new faaab55f0 Remove Int96Converter (#2480) (#2481)
faaab55f0 is described below
commit faaab55f0b4c3733b92c3e494452ee8079ffc1d9
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 22 08:48:08 2022 +0100
Remove Int96Converter (#2480) (#2481)
* Remove Int96Converter (#2480)
* Format
---
parquet/src/arrow/array_reader/builder.rs | 31 ++++--------
parquet/src/arrow/array_reader/primitive_array.rs | 58 +++++++++++++++--------
parquet/src/arrow/arrow_reader/mod.rs | 18 ++++++-
parquet/src/arrow/buffer/converter.rs | 23 +--------
parquet/src/arrow/record_reader/buffer.rs | 2 +
parquet/src/data_type.rs | 22 +++++++--
parquet/src/record/triplet.rs | 2 +-
7 files changed, 86 insertions(+), 70 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 3fb167a20..d944ff2dc 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -27,9 +27,9 @@ use crate::arrow::array_reader::{
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
- FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
- Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
- IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
+ FixedLenBinaryConverter, FixedSizeArrayConverter,
IntervalDayTimeArrayConverter,
+ IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
+ IntervalYearMonthConverter,
};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
@@ -183,26 +183,11 @@ fn build_primitive_reader(
column_desc,
arrow_type,
)?)),
- PhysicalType::INT96 => {
- // get the optional timezone information from arrow type
- let timezone = arrow_type.as_ref().and_then(|data_type| {
- if let DataType::Timestamp(_, tz) = data_type {
- tz.clone()
- } else {
- None
- }
- });
- let converter = Int96Converter::new(Int96ArrayConverter { timezone
});
- Ok(Box::new(ComplexObjectArrayReader::<
- Int96Type,
- Int96Converter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
+ PhysicalType::INT96 =>
Ok(Box::new(PrimitiveArrayReader::<Int96Type>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?)),
PhysicalType::FLOAT =>
Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index d3f71dbab..d4f96e6a8 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -21,15 +21,15 @@ use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Type as PhysicalType;
use crate::column::page::PageIterator;
-use crate::data_type::DataType;
+use crate::data_type::{DataType, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow::array::{
ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
Decimal128Array,
- Float32Array, Float64Array, Int32Array, Int64Array,
+ Float32Array, Float64Array, Int32Array,
Int64Array,TimestampNanosecondArray, TimestampNanosecondBufferBuilder,
};
use arrow::buffer::Buffer;
-use arrow::datatypes::DataType as ArrowType;
+use arrow::datatypes::{DataType as ArrowType, TimeUnit};
use std::any::Any;
use std::sync::Arc;
@@ -98,7 +98,7 @@ where
}
fn consume_batch(&mut self) -> Result<ArrayRef> {
- let target_type = self.get_data_type().clone();
+ let target_type = &self.data_type;
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowType::Boolean,
PhysicalType::INT32 => {
@@ -123,9 +123,11 @@ where
}
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
- PhysicalType::INT96
- | PhysicalType::BYTE_ARRAY
- | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+ PhysicalType::INT96 => match target_type {
+ ArrowType::Timestamp(TimeUnit::Nanosecond, _) =>
target_type.clone(),
+ _ => unreachable!("INT96 must be timestamp nanosecond"),
+ },
+ PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical
types"
);
@@ -135,16 +137,31 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary
- let mut record_data = self.record_reader.consume_record_data();
+ let record_data = self.record_reader.consume_record_data();
+ let record_data = match T::get_physical_type() {
+ PhysicalType::BOOLEAN => {
+ let mut boolean_buffer =
BooleanBufferBuilder::new(record_data.len());
- if T::get_physical_type() == PhysicalType::BOOLEAN {
- let mut boolean_buffer =
BooleanBufferBuilder::new(record_data.len());
+ for e in record_data.as_slice() {
+ boolean_buffer.append(*e > 0);
+ }
+ boolean_buffer.finish()
+ }
+ PhysicalType::INT96 => {
+ // SAFETY - record_data is an aligned buffer of Int96
+ let (prefix, slice, suffix) =
+ unsafe { record_data.as_slice().align_to::<Int96>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+
+ let mut builder =
TimestampNanosecondBufferBuilder::new(slice.len());
+ for v in slice {
+ builder.append(v.to_nanos())
+ }
- for e in record_data.as_slice() {
- boolean_buffer.append(*e > 0);
+ builder.finish()
}
- record_data = boolean_buffer.finish();
- }
+ _ => record_data,
+ };
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
@@ -158,9 +175,10 @@ where
PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as
ArrayRef,
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as
ArrayRef,
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data))
as ArrayRef,
- PhysicalType::INT96
- | PhysicalType::BYTE_ARRAY
- | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+ PhysicalType::INT96 => {
+ Arc::new(TimestampNanosecondArray::from(array_data)) as
ArrayRef
+ }
+ PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical
types"
);
@@ -181,7 +199,7 @@ where
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
- arrow::compute::cast(&a, &target_type)?
+ arrow::compute::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
let array = match array.data_type() {
@@ -207,11 +225,11 @@ where
));
}
}
- .with_precision_and_scale(p, s)?;
+ .with_precision_and_scale(*p, *s)?;
Arc::new(array) as ArrayRef
}
- _ => arrow::compute::cast(&array, &target_type)?,
+ _ => arrow::compute::cast(&array, target_type)?,
};
// save definition and repetition buffers
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 6476751e6..347c38cb3 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -635,7 +635,7 @@ mod tests {
use crate::basic::{ConvertedType, Encoding, Repetition, Type as
PhysicalType};
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
- FixedLenByteArrayType, Int32Type, Int64Type,
+ FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
@@ -858,6 +858,22 @@ mod tests {
);
}
+ #[test]
+ fn test_int96_single_column_reader_test() {
+ let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
+ run_single_column_reader_tests::<Int96Type, _, Int96Type>(
+ 2,
+ ConvertedType::NONE,
+ None,
+ |vals| {
+ Arc::new(TimestampNanosecondArray::from_iter(
+ vals.iter().map(|x| x.map(|x| x.to_nanos())),
+ )) as _
+ },
+ encodings,
+ );
+ }
+
struct RandUtf8Gen {}
impl RandGen<ByteArrayType> for RandUtf8Gen {
diff --git a/parquet/src/arrow/buffer/converter.rs
b/parquet/src/arrow/buffer/converter.rs
index 975fe51d2..eb0c58418 100644
--- a/parquet/src/arrow/buffer/converter.rs
+++ b/parquet/src/arrow/buffer/converter.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::data_type::{FixedLenByteArray, Int96};
+use crate::data_type::FixedLenByteArray;
use arrow::array::{
Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray,
FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
- IntervalYearMonthBuilder, TimestampNanosecondArray,
+ IntervalYearMonthBuilder,
};
use std::sync::Arc;
@@ -156,22 +156,6 @@ impl Converter<Vec<Option<FixedLenByteArray>>,
IntervalDayTimeArray>
}
}
-pub struct Int96ArrayConverter {
- pub timezone: Option<String>,
-}
-
-impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for
Int96ArrayConverter {
- fn convert(&self, source: Vec<Option<Int96>>) ->
Result<TimestampNanosecondArray> {
- Ok(TimestampNanosecondArray::from_opt_vec(
- source
- .into_iter()
- .map(|int96| int96.map(|val| val.to_i64() * 1_000_000))
- .collect(),
- self.timezone.clone(),
- ))
- }
-}
-
#[cfg(test)]
pub struct Utf8ArrayConverter {}
@@ -199,9 +183,6 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for
Utf8ArrayConverter {
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
-pub type Int96Converter =
- ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray,
Int96ArrayConverter>;
-
pub type FixedLenBinaryConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
FixedSizeBinaryArray,
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 7101eaa9c..64ea38f80 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -18,6 +18,7 @@
use std::marker::PhantomData;
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
+use crate::data_type::Int96;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::ArrowNativeType;
@@ -85,6 +86,7 @@ impl ScalarValue for u64 {}
impl ScalarValue for i64 {}
impl ScalarValue for f32 {}
impl ScalarValue for f64 {}
+impl ScalarValue for Int96 {}
/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for
storage
#[derive(Debug)]
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 7bb713b62..35fec60a0 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -34,7 +34,7 @@ use crate::util::{
/// Rust representation for logical type INT96, value is backed by an array of
`u32`.
/// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Debug, PartialOrd, Default, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
@@ -59,15 +59,29 @@ impl Int96 {
/// Converts this INT96 into an i64 representing the number of
MILLISECONDS since Epoch
pub fn to_i64(&self) -> i64 {
+ let (seconds, nanoseconds) = self.to_seconds_and_nanos();
+ seconds * 1_000 + nanoseconds / 1_000_000
+ }
+
+ /// Converts this INT96 into an i64 representing the number of NANOSECONDS
since EPOCH
+ ///
+ /// Will wrap around on overflow
+ pub fn to_nanos(&self) -> i64 {
+ let (seconds, nanoseconds) = self.to_seconds_and_nanos();
+ seconds
+ .wrapping_mul(1_000_000_000)
+ .wrapping_add(nanoseconds)
+ }
+
+ /// Converts this INT96 to a number of seconds and nanoseconds since EPOCH
+ pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
- const MILLIS_PER_SECOND: i64 = 1_000;
let day = self.data()[2] as i64;
let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as
i64;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
-
- seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000
+ (seconds, nanoseconds)
}
}
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 5a7e2a0ca..b4b4ea2f4 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -151,7 +151,7 @@ impl TripletIter {
Field::convert_int64(typed.column_descr(),
*typed.current_value())
}
TripletIter::Int96TripletIter(ref typed) => {
- Field::convert_int96(typed.column_descr(),
typed.current_value().clone())
+ Field::convert_int96(typed.column_descr(),
*typed.current_value())
}
TripletIter::FloatTripletIter(ref typed) => {
Field::convert_float(typed.column_descr(),
*typed.current_value())