This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new faaab55f0 Remove Int96Converter (#2480) (#2481)
faaab55f0 is described below

commit faaab55f0b4c3733b92c3e494452ee8079ffc1d9
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 22 08:48:08 2022 +0100

    Remove Int96Converter (#2480) (#2481)
    
    * Remove Int96Converter (#2480)
    
    * Format
---
 parquet/src/arrow/array_reader/builder.rs         | 31 ++++--------
 parquet/src/arrow/array_reader/primitive_array.rs | 58 +++++++++++++++--------
 parquet/src/arrow/arrow_reader/mod.rs             | 18 ++++++-
 parquet/src/arrow/buffer/converter.rs             | 23 +--------
 parquet/src/arrow/record_reader/buffer.rs         |  2 +
 parquet/src/data_type.rs                          | 22 +++++++--
 parquet/src/record/triplet.rs                     |  2 +-
 7 files changed, 86 insertions(+), 70 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 3fb167a20..d944ff2dc 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -27,9 +27,9 @@ use crate::arrow::array_reader::{
 };
 use crate::arrow::buffer::converter::{
     DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
-    FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
-    Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
-    IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
+    FixedLenBinaryConverter, FixedSizeArrayConverter, 
IntervalDayTimeArrayConverter,
+    IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
+    IntervalYearMonthConverter,
 };
 use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
 use crate::arrow::ProjectionMask;
@@ -183,26 +183,11 @@ fn build_primitive_reader(
             column_desc,
             arrow_type,
         )?)),
-        PhysicalType::INT96 => {
-            // get the optional timezone information from arrow type
-            let timezone = arrow_type.as_ref().and_then(|data_type| {
-                if let DataType::Timestamp(_, tz) = data_type {
-                    tz.clone()
-                } else {
-                    None
-                }
-            });
-            let converter = Int96Converter::new(Int96ArrayConverter { timezone 
});
-            Ok(Box::new(ComplexObjectArrayReader::<
-                Int96Type,
-                Int96Converter,
-            >::new(
-                page_iterator,
-                column_desc,
-                converter,
-                arrow_type,
-            )?))
-        }
+        PhysicalType::INT96 => 
Ok(Box::new(PrimitiveArrayReader::<Int96Type>::new(
+            page_iterator,
+            column_desc,
+            arrow_type,
+        )?)),
         PhysicalType::FLOAT => 
Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
             page_iterator,
             column_desc,
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index d3f71dbab..d4f96e6a8 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -21,15 +21,15 @@ use crate::arrow::record_reader::RecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::Type as PhysicalType;
 use crate::column::page::PageIterator;
-use crate::data_type::DataType;
+use crate::data_type::{DataType, Int96};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use arrow::array::{
     ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, 
Decimal128Array,
-    Float32Array, Float64Array, Int32Array, Int64Array,
+    Float32Array, Float64Array, Int32Array, 
Int64Array,TimestampNanosecondArray, TimestampNanosecondBufferBuilder,
 };
 use arrow::buffer::Buffer;
-use arrow::datatypes::DataType as ArrowType;
+use arrow::datatypes::{DataType as ArrowType, TimeUnit};
 use std::any::Any;
 use std::sync::Arc;
 
@@ -98,7 +98,7 @@ where
     }
 
     fn consume_batch(&mut self) -> Result<ArrayRef> {
-        let target_type = self.get_data_type().clone();
+        let target_type = &self.data_type;
         let arrow_data_type = match T::get_physical_type() {
             PhysicalType::BOOLEAN => ArrowType::Boolean,
             PhysicalType::INT32 => {
@@ -123,9 +123,11 @@ where
             }
             PhysicalType::FLOAT => ArrowType::Float32,
             PhysicalType::DOUBLE => ArrowType::Float64,
-            PhysicalType::INT96
-            | PhysicalType::BYTE_ARRAY
-            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+            PhysicalType::INT96 => match target_type {
+                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => 
target_type.clone(),
+                _ => unreachable!("INT96 must be timestamp nanosecond"),
+            },
+            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                 unreachable!(
                     "PrimitiveArrayReaders don't support complex physical 
types"
                 );
@@ -135,16 +137,31 @@ where
         // Convert to arrays by using the Parquet physical type.
         // The physical types are then cast to Arrow types if necessary
 
-        let mut record_data = self.record_reader.consume_record_data();
+        let record_data = self.record_reader.consume_record_data();
+        let record_data = match T::get_physical_type() {
+            PhysicalType::BOOLEAN => {
+                let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
 
-        if T::get_physical_type() == PhysicalType::BOOLEAN {
-            let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
+                for e in record_data.as_slice() {
+                    boolean_buffer.append(*e > 0);
+                }
+                boolean_buffer.finish()
+            }
+            PhysicalType::INT96 => {
+                // SAFETY - record_data is an aligned buffer of Int96
+                let (prefix, slice, suffix) =
+                    unsafe { record_data.as_slice().align_to::<Int96>() };
+                assert!(prefix.is_empty() && suffix.is_empty());
+
+                let mut builder = 
TimestampNanosecondBufferBuilder::new(slice.len());
+                for v in slice {
+                    builder.append(v.to_nanos())
+                }
 
-            for e in record_data.as_slice() {
-                boolean_buffer.append(*e > 0);
+                builder.finish()
             }
-            record_data = boolean_buffer.finish();
-        }
+            _ => record_data,
+        };
 
         let array_data = ArrayDataBuilder::new(arrow_data_type)
             .len(self.record_reader.num_values())
@@ -158,9 +175,10 @@ where
             PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as 
ArrayRef,
             PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as 
ArrayRef,
             PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)) 
as ArrayRef,
-            PhysicalType::INT96
-            | PhysicalType::BYTE_ARRAY
-            | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+            PhysicalType::INT96 => {
+                Arc::new(TimestampNanosecondArray::from(array_data)) as 
ArrayRef
+            }
+            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
                 unreachable!(
                     "PrimitiveArrayReaders don't support complex physical 
types"
                 );
@@ -181,7 +199,7 @@ where
             ArrowType::Date64 => {
                 // this is cheap as it internally reinterprets the data
                 let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
-                arrow::compute::cast(&a, &target_type)?
+                arrow::compute::cast(&a, target_type)?
             }
             ArrowType::Decimal128(p, s) => {
                 let array = match array.data_type() {
@@ -207,11 +225,11 @@ where
                         ));
                     }
                 }
-                .with_precision_and_scale(p, s)?;
+                .with_precision_and_scale(*p, *s)?;
 
                 Arc::new(array) as ArrayRef
             }
-            _ => arrow::compute::cast(&array, &target_type)?,
+            _ => arrow::compute::cast(&array, target_type)?,
         };
 
         // save definition and repetition buffers
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 6476751e6..347c38cb3 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -635,7 +635,7 @@ mod tests {
     use crate::basic::{ConvertedType, Encoding, Repetition, Type as 
PhysicalType};
     use crate::data_type::{
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
-        FixedLenByteArrayType, Int32Type, Int64Type,
+        FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
     };
     use crate::errors::Result;
     use crate::file::properties::{EnabledStatistics, WriterProperties, 
WriterVersion};
@@ -858,6 +858,22 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_int96_single_column_reader_test() {
+        let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
+        run_single_column_reader_tests::<Int96Type, _, Int96Type>(
+            2,
+            ConvertedType::NONE,
+            None,
+            |vals| {
+                Arc::new(TimestampNanosecondArray::from_iter(
+                    vals.iter().map(|x| x.map(|x| x.to_nanos())),
+                )) as _
+            },
+            encodings,
+        );
+    }
+
     struct RandUtf8Gen {}
 
     impl RandGen<ByteArrayType> for RandUtf8Gen {
diff --git a/parquet/src/arrow/buffer/converter.rs 
b/parquet/src/arrow/buffer/converter.rs
index 975fe51d2..eb0c58418 100644
--- a/parquet/src/arrow/buffer/converter.rs
+++ b/parquet/src/arrow/buffer/converter.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::data_type::{FixedLenByteArray, Int96};
+use crate::data_type::FixedLenByteArray;
 use arrow::array::{
     Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, 
FixedSizeBinaryBuilder,
     IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
-    IntervalYearMonthBuilder, TimestampNanosecondArray,
+    IntervalYearMonthBuilder,
 };
 use std::sync::Arc;
 
@@ -156,22 +156,6 @@ impl Converter<Vec<Option<FixedLenByteArray>>, 
IntervalDayTimeArray>
     }
 }
 
-pub struct Int96ArrayConverter {
-    pub timezone: Option<String>,
-}
-
-impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for 
Int96ArrayConverter {
-    fn convert(&self, source: Vec<Option<Int96>>) -> 
Result<TimestampNanosecondArray> {
-        Ok(TimestampNanosecondArray::from_opt_vec(
-            source
-                .into_iter()
-                .map(|int96| int96.map(|val| val.to_i64() * 1_000_000))
-                .collect(),
-            self.timezone.clone(),
-        ))
-    }
-}
-
 #[cfg(test)]
 pub struct Utf8ArrayConverter {}
 
@@ -199,9 +183,6 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for 
Utf8ArrayConverter {
 pub type Utf8Converter =
     ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
 
-pub type Int96Converter =
-    ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, 
Int96ArrayConverter>;
-
 pub type FixedLenBinaryConverter = ArrayRefConverter<
     Vec<Option<FixedLenByteArray>>,
     FixedSizeBinaryArray,
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 7101eaa9c..64ea38f80 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -18,6 +18,7 @@
 use std::marker::PhantomData;
 
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
+use crate::data_type::Int96;
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow::datatypes::ArrowNativeType;
 
@@ -85,6 +86,7 @@ impl ScalarValue for u64 {}
 impl ScalarValue for i64 {}
 impl ScalarValue for f32 {}
 impl ScalarValue for f64 {}
+impl ScalarValue for Int96 {}
 
 /// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for 
storage
 #[derive(Debug)]
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 7bb713b62..35fec60a0 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -34,7 +34,7 @@ use crate::util::{
 
 /// Rust representation for logical type INT96, value is backed by an array of 
`u32`.
 /// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Debug, PartialOrd, Default, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
 pub struct Int96 {
     value: [u32; 3],
 }
@@ -59,15 +59,29 @@ impl Int96 {
 
     /// Converts this INT96 into an i64 representing the number of 
MILLISECONDS since Epoch
     pub fn to_i64(&self) -> i64 {
+        let (seconds, nanoseconds) = self.to_seconds_and_nanos();
+        seconds * 1_000 + nanoseconds / 1_000_000
+    }
+
+    /// Converts this INT96 into an i64 representing the number of NANOSECONDS 
since EPOCH
+    ///
+    /// Will wrap around on overflow
+    pub fn to_nanos(&self) -> i64 {
+        let (seconds, nanoseconds) = self.to_seconds_and_nanos();
+        seconds
+            .wrapping_mul(1_000_000_000)
+            .wrapping_add(nanoseconds)
+    }
+
+    /// Converts this INT96 to a number of seconds and nanoseconds since EPOCH
+    pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
         const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
         const SECONDS_PER_DAY: i64 = 86_400;
-        const MILLIS_PER_SECOND: i64 = 1_000;
 
         let day = self.data()[2] as i64;
         let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as 
i64;
         let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
-
-        seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000
+        (seconds, nanoseconds)
     }
 }
 
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 5a7e2a0ca..b4b4ea2f4 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -151,7 +151,7 @@ impl TripletIter {
                 Field::convert_int64(typed.column_descr(), 
*typed.current_value())
             }
             TripletIter::Int96TripletIter(ref typed) => {
-                Field::convert_int96(typed.column_descr(), 
typed.current_value().clone())
+                Field::convert_int96(typed.column_descr(), 
*typed.current_value())
             }
             TripletIter::FloatTripletIter(ref typed) => {
                 Field::convert_float(typed.column_descr(), 
*typed.current_value())

Reply via email to