This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new dff67c9b78 GH-7686: [Parquet] Fix int96 min/max stats (#7687) dff67c9b78 is described below commit dff67c9b78bbd6f2311f580ecc20e97e71e013db Author: Rahul Sharma <rahul.k...@gmail.com> AuthorDate: Wed Jul 23 00:04:06 2025 +0200 GH-7686: [Parquet] Fix int96 min/max stats (#7687) # Which issue does this PR close? - Closes #7686 # Rationale for this change int96 min/max statistics emitted by arrow-rs are incorrect. # What changes are included in this PR? 1. Fix the int96 stats 2. Add round-trip test to verify the behavior # Not included in this PR: 1. Read stats only from known good writers. This will be implemented after a new arrow-rs release. # Are there any user-facing changes? The int96 min/max statistics will be different and correct. --------- Co-authored-by: Rahul Sharma <rahul.sha...@databricks.com> Co-authored-by: Ed Seidl <etse...@live.com> Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> Co-authored-by: Alkis Evlogimenos <al...@evlogimenos.com> --- parquet/src/column/writer/mod.rs | 4 +- parquet/src/data_type.rs | 38 ++++++++- parquet/src/file/statistics.rs | 3 - parquet/tests/int96_stats_roundtrip.rs | 151 +++++++++++++++++++++++++++++++++ 4 files changed, 187 insertions(+), 9 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index db7cd31468..9374e226b8 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2528,8 +2528,8 @@ mod tests { let stats = statistics_roundtrip::<Int96Type>(&input); assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Int96(stats) = stats { - assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30])); - assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10])); + assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10])); + assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30])); } else { panic!("expecting Statistics::Int96, got {stats:?}"); } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 639567f604..6cba02ab3e 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -33,7 +33,7 @@ use crate::util::bit_util::FromBytes; /// Rust representation for logical type INT96, value is backed by an array of `u32`. /// The type only takes 12 bytes, without extra padding. -#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub struct Int96 { value: [u32; 3], } @@ -118,14 +118,44 @@ impl Int96 { .wrapping_add(nanos) } + #[inline] + fn get_days(&self) -> i32 { + self.data()[2] as i32 + } + + #[inline] + fn get_nanos(&self) -> i64 { + ((self.data()[1] as i64) << 32) + self.data()[0] as i64 + } + #[inline] fn data_as_days_and_nanos(&self) -> (i32, i64) { - let day = self.data()[2] as i32; - let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64; - (day, nanos) + (self.get_days(), self.get_nanos()) + } +} + +impl PartialOrd for Int96 { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) } } +impl Ord for Int96 { + /// Order `Int96` correctly for (deprecated) timestamp types. + /// + /// Note: this is done even though the Int96 type is deprecated and the + /// [spec does not define the sort order] + /// because some engines, notably Spark and Databricks Photon still write + /// Int96 timestamps and rely on their order for optimization. + /// + /// [spec does not define the sort order]: https://github.com/apache/parquet-format/blob/cf943c197f4fad826b14ba0c40eb0ffdab585285/src/main/thrift/parquet.thrift#L1079 + fn cmp(&self, other: &Self) -> Ordering { + match self.get_days().cmp(&other.get_days()) { + Ordering::Equal => self.get_nanos().cmp(&other.get_nanos()), + ord => ord, + } + } +} impl From<Vec<u32>> for Int96 { fn from(buf: Vec<u32>) -> Self { assert_eq!(buf.len(), 3); diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 0cfcb4d925..d0105461f1 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -209,9 +209,6 @@ pub fn from_thrift( old_format, ), Type::INT96 => { - // INT96 statistics may not be correct, because comparison is signed - // byte-wise, not actual timestamps. It is recommended to ignore - // min/max statistics for INT96 columns. let min = if let Some(data) = min { assert_eq!(data.len(), 12); Some(Int96::try_from_le_slice(&data)?) diff --git a/parquet/tests/int96_stats_roundtrip.rs b/parquet/tests/int96_stats_roundtrip.rs new file mode 100644 index 0000000000..d6ba8d419e --- /dev/null +++ b/parquet/tests/int96_stats_roundtrip.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use chrono::{DateTime, NaiveDateTime, Utc}; +use parquet::basic::Type; +use parquet::data_type::{Int96, Int96Type}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::statistics::Statistics; +use parquet::file::writer::SerializedFileWriter; +use parquet::schema::parser::parse_message_type; +use rand::seq::SliceRandom; +use std::fs::File; +use std::sync::Arc; +use tempfile::Builder; + +fn datetime_to_int96(dt: &str) -> Int96 { + let naive = NaiveDateTime::parse_from_str(dt, "%Y-%m-%d %H:%M:%S%.f").unwrap(); + let datetime: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, Utc); + let nanos = datetime.timestamp_nanos_opt().unwrap(); + let mut int96 = Int96::new(); + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const NANOSECONDS_IN_DAY: i64 = 86_400_000_000_000; + let days = nanos / NANOSECONDS_IN_DAY; + let remaining_nanos = nanos % NANOSECONDS_IN_DAY; + let julian_day = (days + JULIAN_DAY_OF_EPOCH) as i32; + let julian_day_u32 = julian_day as u32; + let nanos_low = (remaining_nanos & 0xFFFFFFFF) as u32; + let nanos_high = ((remaining_nanos >> 32) & 0xFFFFFFFF) as u32; + int96.set_data(nanos_low, nanos_high, julian_day_u32); + int96 +} + +fn verify_ordering(data: Vec<Int96>) { + // Create a temporary file + let tmp = Builder::new() + .prefix("test_int96_stats") + .tempfile() + .unwrap(); + let file_path = tmp.path().to_owned(); + + // Create schema with INT96 field + let message_type = " + message test { + REQUIRED INT96 timestamp; + } + "; + let schema = parse_message_type(message_type).unwrap(); + + // Configure writer properties to enable statistics + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + + let expected_min = data[0]; + let expected_max = data[data.len() - 1]; + + { + let file = File::create(&file_path).unwrap(); + let mut writer = SerializedFileWriter::new(file, schema.into(), Arc::new(props)).unwrap(); + let mut row_group = writer.next_row_group().unwrap(); + let mut col_writer = row_group.next_column().unwrap().unwrap(); + + { + let writer = col_writer.typed::<Int96Type>(); + let mut shuffled_data = data.clone(); + shuffled_data.shuffle(&mut rand::rng()); + writer.write_batch(&shuffled_data, None, None).unwrap(); + } + col_writer.close().unwrap(); + row_group.close().unwrap(); + writer.close().unwrap(); + } + + let file = File::open(&file_path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + let row_group = metadata.row_group(0); + let column = row_group.column(0); + + let stats = column.statistics().unwrap(); + assert_eq!(stats.physical_type(), Type::INT96); + + if let Statistics::Int96(stats) = stats { + let min = stats.min_opt().unwrap(); + let max = stats.max_opt().unwrap(); + + assert_eq!( + *min, expected_min, + "Min value should be {expected_min} but was {min}" + ); + assert_eq!( + *max, expected_max, + "Max value should be {expected_max} but was {max}" + ); + assert_eq!(stats.null_count_opt(), Some(0)); + } else { + panic!("Expected Int96 statistics"); + } +} + +#[test] +fn test_multiple_dates() { + let data = vec![ + datetime_to_int96("2020-01-01 00:00:00.000"), + datetime_to_int96("2020-02-29 23:59:59.000"), + datetime_to_int96("2020-12-31 23:59:59.000"), + datetime_to_int96("2021-01-01 00:00:00.000"), + datetime_to_int96("2023-06-15 12:30:45.000"), + datetime_to_int96("2024-02-29 15:45:30.000"), + datetime_to_int96("2024-12-25 07:00:00.000"), + datetime_to_int96("2025-01-01 00:00:00.000"), + datetime_to_int96("2025-07-04 20:00:00.000"), + datetime_to_int96("2025-12-31 23:59:59.000"), + ]; + verify_ordering(data); +} + +#[test] +fn test_same_day_different_time() { + let data = vec![ + datetime_to_int96("2020-01-01 00:01:00.000"), + datetime_to_int96("2020-01-01 00:02:00.000"), + datetime_to_int96("2020-01-01 00:03:00.000"), + ]; + verify_ordering(data); +} + +#[test] +fn test_increasing_day_decreasing_time() { + let data = vec![ + datetime_to_int96("2020-01-01 12:00:00.000"), + datetime_to_int96("2020-02-01 11:00:00.000"), + datetime_to_int96("2020-03-01 10:00:00.000"), + ]; + verify_ordering(data); +}