This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new dff67c9b78 GH-7686: [Parquet] Fix int96 min/max stats (#7687)
dff67c9b78 is described below

commit dff67c9b78bbd6f2311f580ecc20e97e71e013db
Author: Rahul Sharma <rahul.k...@gmail.com>
AuthorDate: Wed Jul 23 00:04:06 2025 +0200

    GH-7686: [Parquet] Fix int96 min/max stats (#7687)
    
    # Which issue does this PR close?
    
    - Closes #7686
    
    # Rationale for this change
    
    int96 min/max statistics emitted by arrow-rs are incorrect.
    
    # What changes are included in this PR?
    
    1. Fix the int96 stats
    2. Add round-trip test to verify the behavior
    
    # Not included in this PR:
    1. Read stats only from known good writers. This will be implemented
    after a new arrow-rs release.
    
    # Are there any user-facing changes?
    The int96 min/max statistics will be different and correct.
    
    ---------
    
    Co-authored-by: Rahul Sharma <rahul.sha...@databricks.com>
    Co-authored-by: Ed Seidl <etse...@live.com>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    Co-authored-by: Alkis Evlogimenos <al...@evlogimenos.com>
---
 parquet/src/column/writer/mod.rs       |   4 +-
 parquet/src/data_type.rs               |  38 ++++++++-
 parquet/src/file/statistics.rs         |   3 -
 parquet/tests/int96_stats_roundtrip.rs | 151 +++++++++++++++++++++++++++++++++
 4 files changed, 187 insertions(+), 9 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index db7cd31468..9374e226b8 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2528,8 +2528,8 @@ mod tests {
         let stats = statistics_roundtrip::<Int96Type>(&input);
         assert!(!stats.is_min_max_backwards_compatible());
         if let Statistics::Int96(stats) = stats {
-            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 
30]));
-            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 
10]));
+            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 
10]));
+            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 
30]));
         } else {
             panic!("expecting Statistics::Int96, got {stats:?}");
         }
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 639567f604..6cba02ab3e 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -33,7 +33,7 @@ use crate::util::bit_util::FromBytes;
 
 /// Rust representation for logical type INT96, value is backed by an array of 
`u32`.
 /// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
 pub struct Int96 {
     value: [u32; 3],
 }
@@ -118,14 +118,44 @@ impl Int96 {
             .wrapping_add(nanos)
     }
 
+    #[inline]
+    fn get_days(&self) -> i32 {
+        self.data()[2] as i32
+    }
+
+    #[inline]
+    fn get_nanos(&self) -> i64 {
+        ((self.data()[1] as i64) << 32) + self.data()[0] as i64
+    }
+
     #[inline]
     fn data_as_days_and_nanos(&self) -> (i32, i64) {
-        let day = self.data()[2] as i32;
-        let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
-        (day, nanos)
+        (self.get_days(), self.get_nanos())
+    }
+}
+
+impl PartialOrd for Int96 {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
     }
 }
 
+impl Ord for Int96 {
+    /// Order `Int96` correctly for (deprecated) timestamp types.
+    ///
+    /// Note: this is done even though the Int96 type is deprecated and the
+    /// [spec does not define the sort order]
+    /// because some engines, notably Spark and Databricks Photon still write
+    /// Int96 timestamps and rely on their order for optimization.
+    ///
+    /// [spec does not define the sort order]: 
https://github.com/apache/parquet-format/blob/cf943c197f4fad826b14ba0c40eb0ffdab585285/src/main/thrift/parquet.thrift#L1079
+    fn cmp(&self, other: &Self) -> Ordering {
+        match self.get_days().cmp(&other.get_days()) {
+            Ordering::Equal => self.get_nanos().cmp(&other.get_nanos()),
+            ord => ord,
+        }
+    }
+}
 impl From<Vec<u32>> for Int96 {
     fn from(buf: Vec<u32>) -> Self {
         assert_eq!(buf.len(), 3);
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index 0cfcb4d925..d0105461f1 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -209,9 +209,6 @@ pub fn from_thrift(
                     old_format,
                 ),
                 Type::INT96 => {
-                    // INT96 statistics may not be correct, because comparison 
is signed
-                    // byte-wise, not actual timestamps. It is recommended to 
ignore
-                    // min/max statistics for INT96 columns.
                     let min = if let Some(data) = min {
                         assert_eq!(data.len(), 12);
                         Some(Int96::try_from_le_slice(&data)?)
diff --git a/parquet/tests/int96_stats_roundtrip.rs 
b/parquet/tests/int96_stats_roundtrip.rs
new file mode 100644
index 0000000000..d6ba8d419e
--- /dev/null
+++ b/parquet/tests/int96_stats_roundtrip.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use chrono::{DateTime, NaiveDateTime, Utc};
+use parquet::basic::Type;
+use parquet::data_type::{Int96, Int96Type};
+use parquet::file::properties::{EnabledStatistics, WriterProperties};
+use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::file::statistics::Statistics;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use rand::seq::SliceRandom;
+use std::fs::File;
+use std::sync::Arc;
+use tempfile::Builder;
+
+fn datetime_to_int96(dt: &str) -> Int96 {
+    let naive = NaiveDateTime::parse_from_str(dt, "%Y-%m-%d 
%H:%M:%S%.f").unwrap();
+    let datetime: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, 
Utc);
+    let nanos = datetime.timestamp_nanos_opt().unwrap();
+    let mut int96 = Int96::new();
+    const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
+    const NANOSECONDS_IN_DAY: i64 = 86_400_000_000_000;
+    let days = nanos / NANOSECONDS_IN_DAY;
+    let remaining_nanos = nanos % NANOSECONDS_IN_DAY;
+    let julian_day = (days + JULIAN_DAY_OF_EPOCH) as i32;
+    let julian_day_u32 = julian_day as u32;
+    let nanos_low = (remaining_nanos & 0xFFFFFFFF) as u32;
+    let nanos_high = ((remaining_nanos >> 32) & 0xFFFFFFFF) as u32;
+    int96.set_data(nanos_low, nanos_high, julian_day_u32);
+    int96
+}
+
+fn verify_ordering(data: Vec<Int96>) {
+    // Create a temporary file
+    let tmp = Builder::new()
+        .prefix("test_int96_stats")
+        .tempfile()
+        .unwrap();
+    let file_path = tmp.path().to_owned();
+
+    // Create schema with INT96 field
+    let message_type = "
+        message test {
+            REQUIRED INT96 timestamp;
+        }
+    ";
+    let schema = parse_message_type(message_type).unwrap();
+
+    // Configure writer properties to enable statistics
+    let props = WriterProperties::builder()
+        .set_statistics_enabled(EnabledStatistics::Page)
+        .build();
+
+    let expected_min = data[0];
+    let expected_max = data[data.len() - 1];
+
+    {
+        let file = File::create(&file_path).unwrap();
+        let mut writer = SerializedFileWriter::new(file, schema.into(), 
Arc::new(props)).unwrap();
+        let mut row_group = writer.next_row_group().unwrap();
+        let mut col_writer = row_group.next_column().unwrap().unwrap();
+
+        {
+            let writer = col_writer.typed::<Int96Type>();
+            let mut shuffled_data = data.clone();
+            shuffled_data.shuffle(&mut rand::rng());
+            writer.write_batch(&shuffled_data, None, None).unwrap();
+        }
+        col_writer.close().unwrap();
+        row_group.close().unwrap();
+        writer.close().unwrap();
+    }
+
+    let file = File::open(&file_path).unwrap();
+    let reader = SerializedFileReader::new(file).unwrap();
+    let metadata = reader.metadata();
+    let row_group = metadata.row_group(0);
+    let column = row_group.column(0);
+
+    let stats = column.statistics().unwrap();
+    assert_eq!(stats.physical_type(), Type::INT96);
+
+    if let Statistics::Int96(stats) = stats {
+        let min = stats.min_opt().unwrap();
+        let max = stats.max_opt().unwrap();
+
+        assert_eq!(
+            *min, expected_min,
+            "Min value should be {expected_min} but was {min}"
+        );
+        assert_eq!(
+            *max, expected_max,
+            "Max value should be {expected_max} but was {max}"
+        );
+        assert_eq!(stats.null_count_opt(), Some(0));
+    } else {
+        panic!("Expected Int96 statistics");
+    }
+}
+
+#[test]
+fn test_multiple_dates() {
+    let data = vec![
+        datetime_to_int96("2020-01-01 00:00:00.000"),
+        datetime_to_int96("2020-02-29 23:59:59.000"),
+        datetime_to_int96("2020-12-31 23:59:59.000"),
+        datetime_to_int96("2021-01-01 00:00:00.000"),
+        datetime_to_int96("2023-06-15 12:30:45.000"),
+        datetime_to_int96("2024-02-29 15:45:30.000"),
+        datetime_to_int96("2024-12-25 07:00:00.000"),
+        datetime_to_int96("2025-01-01 00:00:00.000"),
+        datetime_to_int96("2025-07-04 20:00:00.000"),
+        datetime_to_int96("2025-12-31 23:59:59.000"),
+    ];
+    verify_ordering(data);
+}
+
+#[test]
+fn test_same_day_different_time() {
+    let data = vec![
+        datetime_to_int96("2020-01-01 00:01:00.000"),
+        datetime_to_int96("2020-01-01 00:02:00.000"),
+        datetime_to_int96("2020-01-01 00:03:00.000"),
+    ];
+    verify_ordering(data);
+}
+
+#[test]
+fn test_increasing_day_decreasing_time() {
+    let data = vec![
+        datetime_to_int96("2020-01-01 12:00:00.000"),
+        datetime_to_int96("2020-02-01 11:00:00.000"),
+        datetime_to_int96("2020-03-01 10:00:00.000"),
+    ];
+    verify_ordering(data);
+}

Reply via email to