This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a544d0909 Extract parquet statistics from Time32 and Time64 columns 
(#10771)
5a544d0909 is described below

commit 5a544d0909f835a6933027eeef5d4290cbc341ae
Author: Lordworms <[email protected]>
AuthorDate: Tue Jun 4 03:38:03 2024 -0700

    Extract parquet statistics from Time32 and Time64 columns (#10771)
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../datasource/physical_plan/parquet/statistics.rs |  12 ++
 datafusion/core/tests/parquet/arrow_statistics.rs  | 102 ++++++++++++-
 datafusion/core/tests/parquet/mod.rs               | 162 ++++++++++++++++++++-
 3 files changed, 269 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 9d59808508..756eb8eb10 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -104,6 +104,12 @@ macro_rules! get_statistic {
                     Some(DataType::Date64) => {
                         Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 
24 * 60 * 60 * 1000)))
                     }
+                    Some(DataType::Time32(TimeUnit::Second)) => {
+                        Some(ScalarValue::Time32Second(Some((*s.$func()))))
+                    }
+                    Some(DataType::Time32(TimeUnit::Millisecond)) => {
+                        
Some(ScalarValue::Time32Millisecond(Some((*s.$func()))))
+                    }
                     _ => Some(ScalarValue::Int32(Some(*s.$func()))),
                 }
             }
@@ -120,6 +126,12 @@ macro_rules! get_statistic {
                     Some(DataType::UInt64) => {
                         Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
                     }
+                    Some(DataType::Time64(TimeUnit::Microsecond)) => {
+                        Some(ScalarValue::Time64Microsecond(Some((*s.$func() 
as i64))))
+                    }
+                    Some(DataType::Time64(TimeUnit::Nanosecond)) => {
+                        Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as 
i64))))
+                    }
                     Some(DataType::Timestamp(unit, timezone)) => {
                         Some(match unit {
                             TimeUnit::Second => ScalarValue::TimestampSecond(
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 3b7961236e..e58bf23705 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -31,9 +31,10 @@ use arrow_array::{
     make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
     Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, 
Float64Array,
     Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, 
LargeStringArray,
-    RecordBatch, StringArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
-    TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
-    UInt64Array, UInt8Array,
+    RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+    UInt16Array, UInt32Array, UInt64Array, UInt8Array,
 };
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::datasource::physical_plan::parquet::{
@@ -918,6 +919,101 @@ async fn test_dates_32_diff_rg_sizes() {
     .run();
 }
 
+#[tokio::test]
+async fn test_time32_second_and_time64_nanosecond_diff_rg_sizes() {
+    let reader_time32 = TestReader {
+        scenario: Scenario::Time32Second,
+        row_per_group: 4,
+    };
+
+    // Test for Time32Second column
+    Test {
+        reader: reader_time32.build().await,
+        // Assuming specific minimum and maximum values for demonstration
+        expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510, 
18514, 18518])),
+        expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513, 
18517, 18521])),
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 
1 null per row group for simplicity
+        expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+        column_name: "second",
+    }
+    .run();
+
+    let reader_time32_millisecond = TestReader {
+        scenario: Scenario::Time32Millisecond,
+        row_per_group: 4,
+    };
+
+    // Test for Time32Millisecond column
+    Test {
+        reader: reader_time32_millisecond.build().await,
+        // Assuming specific minimum and maximum values for demonstration
+        expected_min: Arc::new(Time32MillisecondArray::from(vec![
+            3600000, 3600004, 3600008, 3600012,
+        ])),
+        expected_max: Arc::new(Time32MillisecondArray::from(vec![
+            3600003, 3600007, 3600011, 3600015,
+        ])),
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 
1 null per row group for simplicity
+        expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+        column_name: "millisecond",
+    }
+    .run();
+
+    let reader_time64_micro = TestReader {
+        scenario: Scenario::Time64Microsecond,
+        row_per_group: 4,
+    };
+
+    // Test for Time64MicroSecond column
+    Test {
+        reader: reader_time64_micro.build().await,
+        // Assuming specific minimum and maximum values for demonstration
+        expected_min: Arc::new(Time64MicrosecondArray::from(vec![
+            1234567890123,
+            1234567890127,
+            1234567890131,
+            1234567890135,
+        ])),
+        expected_max: Arc::new(Time64MicrosecondArray::from(vec![
+            1234567890126,
+            1234567890130,
+            1234567890134,
+            1234567890138,
+        ])),
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 
1 null per row group for simplicity
+        expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+        column_name: "microsecond",
+    }
+    .run();
+
+    let reader_time64_nano = TestReader {
+        scenario: Scenario::Time64Nanosecond,
+        row_per_group: 4,
+    };
+
+    // Test for Time32Second column
+    Test {
+        reader: reader_time64_nano.build().await,
+        // Assuming specific minimum and maximum values for demonstration
+        expected_min: Arc::new(Time64NanosecondArray::from(vec![
+            987654321012345,
+            987654321012349,
+            987654321012353,
+            987654321012357,
+        ])),
+        expected_max: Arc::new(Time64NanosecondArray::from(vec![
+            987654321012348,
+            987654321012352,
+            987654321012356,
+            987654321012360,
+        ])),
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 
1 null per row group for simplicity
+        expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+        column_name: "nanosecond",
+    }
+    .run();
+}
+
 #[tokio::test]
 async fn test_dates_64_diff_rg_sizes() {
     // The file is created by 4 record batches (each has a null row), each has 
5 rows but then will be split into 2 row groups with size 13, 7
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index fd2f184328..72f659205a 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -22,11 +22,12 @@ use arrow::{
         make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
         DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, 
Float64Array,
         Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
-        LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
-        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
-        UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+        LargeStringArray, StringArray, StructArray, Time32MillisecondArray,
+        Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
+        TimestampMicrosecondArray, TimestampMillisecondArray, 
TimestampNanosecondArray,
+        TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, 
UInt8Array,
     },
-    datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
+    datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit},
     record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
@@ -73,6 +74,10 @@ enum Scenario {
     Int32Range,
     UInt,
     UInt32Range,
+    Time32Second,
+    Time32Millisecond,
+    Time64Nanosecond,
+    Time64Microsecond,
     /// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64
     /// -MIN, -100, -1, 0, 1, 100, MAX
     NumericLimits,
@@ -486,6 +491,55 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch {
     .unwrap()
 }
 
+/// Return record batch with Time32Second, Time32Millisecond sequences
+fn make_time32_batches(scenario: Scenario, v: Vec<i32>) -> RecordBatch {
+    match scenario {
+        Scenario::Time32Second => {
+            let schema = Arc::new(Schema::new(vec![Field::new(
+                "second",
+                DataType::Time32(TimeUnit::Second),
+                true,
+            )]));
+            let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef;
+            RecordBatch::try_new(schema, vec![array]).unwrap()
+        }
+        Scenario::Time32Millisecond => {
+            let schema = Arc::new(Schema::new(vec![Field::new(
+                "millisecond",
+                DataType::Time32(TimeUnit::Millisecond),
+                true,
+            )]));
+            let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef;
+            RecordBatch::try_new(schema, vec![array]).unwrap()
+        }
+        _ => panic!("Unsupported scenario for Time32"),
+    }
+}
+
+/// Return record batch with Time64Microsecond, Time64Nanosecond sequences
+fn make_time64_batches(scenario: Scenario, v: Vec<i64>) -> RecordBatch {
+    match scenario {
+        Scenario::Time64Microsecond => {
+            let schema = Arc::new(Schema::new(vec![Field::new(
+                "microsecond",
+                DataType::Time64(TimeUnit::Microsecond),
+                true,
+            )]));
+            let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef;
+            RecordBatch::try_new(schema, vec![array]).unwrap()
+        }
+        Scenario::Time64Nanosecond => {
+            let schema = Arc::new(Schema::new(vec![Field::new(
+                "nanosecond",
+                DataType::Time64(TimeUnit::Nanosecond),
+                true,
+            )]));
+            let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef;
+            RecordBatch::try_new(schema, vec![array]).unwrap()
+        }
+        _ => panic!("Unsupported scenario for Time64"),
+    }
+}
 /// Return record batch with u8, u16, u32, and u64 sequences
 ///
 /// Columns are named
@@ -1121,6 +1175,106 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
             )]));
             vec![RecordBatch::try_new(schema, 
vec![struct_array_data]).unwrap()]
         }
+        Scenario::Time32Second => {
+            vec![
+                make_time32_batches(
+                    Scenario::Time32Second,
+                    vec![18506, 18507, 18508, 18509],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Second,
+                    vec![18510, 18511, 18512, 18513],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Second,
+                    vec![18514, 18515, 18516, 18517],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Second,
+                    vec![18518, 18519, 18520, 18521],
+                ),
+            ]
+        }
+        Scenario::Time32Millisecond => {
+            vec![
+                make_time32_batches(
+                    Scenario::Time32Millisecond,
+                    vec![3600000, 3600001, 3600002, 3600003],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Millisecond,
+                    vec![3600004, 3600005, 3600006, 3600007],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Millisecond,
+                    vec![3600008, 3600009, 3600010, 3600011],
+                ),
+                make_time32_batches(
+                    Scenario::Time32Millisecond,
+                    vec![3600012, 3600013, 3600014, 3600015],
+                ),
+            ]
+        }
+        Scenario::Time64Microsecond => {
+            vec![
+                make_time64_batches(
+                    Scenario::Time64Microsecond,
+                    vec![1234567890123, 1234567890124, 1234567890125, 
1234567890126],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Microsecond,
+                    vec![1234567890127, 1234567890128, 1234567890129, 
1234567890130],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Microsecond,
+                    vec![1234567890131, 1234567890132, 1234567890133, 
1234567890134],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Microsecond,
+                    vec![1234567890135, 1234567890136, 1234567890137, 
1234567890138],
+                ),
+            ]
+        }
+        Scenario::Time64Nanosecond => {
+            vec![
+                make_time64_batches(
+                    Scenario::Time64Nanosecond,
+                    vec![
+                        987654321012345,
+                        987654321012346,
+                        987654321012347,
+                        987654321012348,
+                    ],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Nanosecond,
+                    vec![
+                        987654321012349,
+                        987654321012350,
+                        987654321012351,
+                        987654321012352,
+                    ],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Nanosecond,
+                    vec![
+                        987654321012353,
+                        987654321012354,
+                        987654321012355,
+                        987654321012356,
+                    ],
+                ),
+                make_time64_batches(
+                    Scenario::Time64Nanosecond,
+                    vec![
+                        987654321012357,
+                        987654321012358,
+                        987654321012359,
+                        987654321012360,
+                    ],
+                ),
+            ]
+        }
         Scenario::UTF8 => {
             vec![
                 make_utf8_batch(vec![Some("a"), Some("b"), Some("c"), 
Some("d"), None]),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to