This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5a544d0909 Extract parquet statistics from Time32 and Time64 columns
(#10771)
5a544d0909 is described below
commit 5a544d0909f835a6933027eeef5d4290cbc341ae
Author: Lordworms <[email protected]>
AuthorDate: Tue Jun 4 03:38:03 2024 -0700
Extract parquet statistics from Time32 and Time64 columns (#10771)
Co-authored-by: Andrew Lamb <[email protected]>
---
.../datasource/physical_plan/parquet/statistics.rs | 12 ++
datafusion/core/tests/parquet/arrow_statistics.rs | 102 ++++++++++++-
datafusion/core/tests/parquet/mod.rs | 162 ++++++++++++++++++++-
3 files changed, 269 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 9d59808508..756eb8eb10 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -104,6 +104,12 @@ macro_rules! get_statistic {
Some(DataType::Date64) => {
Some(ScalarValue::Date64(Some(i64::from(*s.$func()) *
24 * 60 * 60 * 1000)))
}
+ Some(DataType::Time32(TimeUnit::Second)) => {
+ Some(ScalarValue::Time32Second(Some((*s.$func()))))
+ }
+ Some(DataType::Time32(TimeUnit::Millisecond)) => {
+
Some(ScalarValue::Time32Millisecond(Some((*s.$func()))))
+ }
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
@@ -120,6 +126,12 @@ macro_rules! get_statistic {
Some(DataType::UInt64) => {
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
}
+ Some(DataType::Time64(TimeUnit::Microsecond)) => {
+ Some(ScalarValue::Time64Microsecond(Some((*s.$func()
as i64))))
+ }
+ Some(DataType::Time64(TimeUnit::Nanosecond)) => {
+ Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as
i64))))
+ }
Some(DataType::Timestamp(unit, timezone)) => {
Some(match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 3b7961236e..e58bf23705 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -31,9 +31,10 @@ use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray,
- RecordBatch, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
- TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
- UInt64Array, UInt8Array,
+ RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray,
+ Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+ UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
@@ -918,6 +919,101 @@ async fn test_dates_32_diff_rg_sizes() {
.run();
}
+#[tokio::test]
+async fn test_time32_second_and_time64_nanosecond_diff_rg_sizes() {
+ let reader_time32 = TestReader {
+ scenario: Scenario::Time32Second,
+ row_per_group: 4,
+ };
+
+ // Test for Time32Second column
+ Test {
+ reader: reader_time32.build().await,
+ // Assuming specific minimum and maximum values for demonstration
+ expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510,
18514, 18518])),
+ expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513,
18517, 18521])),
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming
1 null per row group for simplicity
+ expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+ column_name: "second",
+ }
+ .run();
+
+ let reader_time32_millisecond = TestReader {
+ scenario: Scenario::Time32Millisecond,
+ row_per_group: 4,
+ };
+
+ // Test for Time32Millisecond column
+ Test {
+ reader: reader_time32_millisecond.build().await,
+ // Assuming specific minimum and maximum values for demonstration
+ expected_min: Arc::new(Time32MillisecondArray::from(vec![
+ 3600000, 3600004, 3600008, 3600012,
+ ])),
+ expected_max: Arc::new(Time32MillisecondArray::from(vec![
+ 3600003, 3600007, 3600011, 3600015,
+ ])),
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming
1 null per row group for simplicity
+ expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+ column_name: "millisecond",
+ }
+ .run();
+
+ let reader_time64_micro = TestReader {
+ scenario: Scenario::Time64Microsecond,
+ row_per_group: 4,
+ };
+
+ // Test for Time64MicroSecond column
+ Test {
+ reader: reader_time64_micro.build().await,
+ // Assuming specific minimum and maximum values for demonstration
+ expected_min: Arc::new(Time64MicrosecondArray::from(vec![
+ 1234567890123,
+ 1234567890127,
+ 1234567890131,
+ 1234567890135,
+ ])),
+ expected_max: Arc::new(Time64MicrosecondArray::from(vec![
+ 1234567890126,
+ 1234567890130,
+ 1234567890134,
+ 1234567890138,
+ ])),
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming
1 null per row group for simplicity
+ expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+ column_name: "microsecond",
+ }
+ .run();
+
+ let reader_time64_nano = TestReader {
+ scenario: Scenario::Time64Nanosecond,
+ row_per_group: 4,
+ };
+
+ // Test for Time32Second column
+ Test {
+ reader: reader_time64_nano.build().await,
+ // Assuming specific minimum and maximum values for demonstration
+ expected_min: Arc::new(Time64NanosecondArray::from(vec![
+ 987654321012345,
+ 987654321012349,
+ 987654321012353,
+ 987654321012357,
+ ])),
+ expected_max: Arc::new(Time64NanosecondArray::from(vec![
+ 987654321012348,
+ 987654321012352,
+ 987654321012356,
+ 987654321012360,
+ ])),
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming
1 null per row group for simplicity
+ expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
+ column_name: "nanosecond",
+ }
+ .run();
+}
+
#[tokio::test]
async fn test_dates_64_diff_rg_sizes() {
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 2 row groups with size 13, 7
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index fd2f184328..72f659205a 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -22,11 +22,12 @@ use arrow::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array,
DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
- LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
- TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
- UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+ LargeStringArray, StringArray, StructArray, Time32MillisecondArray,
+ Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray,
+ TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
},
- datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
+ datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit},
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
@@ -73,6 +74,10 @@ enum Scenario {
Int32Range,
UInt,
UInt32Range,
+ Time32Second,
+ Time32Millisecond,
+ Time64Nanosecond,
+ Time64Microsecond,
/// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
@@ -486,6 +491,55 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch {
.unwrap()
}
+/// Return record batch with Time32Second, Time32Millisecond sequences
+fn make_time32_batches(scenario: Scenario, v: Vec<i32>) -> RecordBatch {
+ match scenario {
+ Scenario::Time32Second => {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "second",
+ DataType::Time32(TimeUnit::Second),
+ true,
+ )]));
+ let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+ }
+ Scenario::Time32Millisecond => {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "millisecond",
+ DataType::Time32(TimeUnit::Millisecond),
+ true,
+ )]));
+ let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+ }
+ _ => panic!("Unsupported scenario for Time32"),
+ }
+}
+
+/// Return record batch with Time64Microsecond, Time64Nanosecond sequences
+fn make_time64_batches(scenario: Scenario, v: Vec<i64>) -> RecordBatch {
+ match scenario {
+ Scenario::Time64Microsecond => {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "microsecond",
+ DataType::Time64(TimeUnit::Microsecond),
+ true,
+ )]));
+ let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+ }
+ Scenario::Time64Nanosecond => {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "nanosecond",
+ DataType::Time64(TimeUnit::Nanosecond),
+ true,
+ )]));
+ let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+ }
+ _ => panic!("Unsupported scenario for Time64"),
+ }
+}
/// Return record batch with u8, u16, u32, and u64 sequences
///
/// Columns are named
@@ -1121,6 +1175,106 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
)]));
vec![RecordBatch::try_new(schema,
vec![struct_array_data]).unwrap()]
}
+ Scenario::Time32Second => {
+ vec![
+ make_time32_batches(
+ Scenario::Time32Second,
+ vec![18506, 18507, 18508, 18509],
+ ),
+ make_time32_batches(
+ Scenario::Time32Second,
+ vec![18510, 18511, 18512, 18513],
+ ),
+ make_time32_batches(
+ Scenario::Time32Second,
+ vec![18514, 18515, 18516, 18517],
+ ),
+ make_time32_batches(
+ Scenario::Time32Second,
+ vec![18518, 18519, 18520, 18521],
+ ),
+ ]
+ }
+ Scenario::Time32Millisecond => {
+ vec![
+ make_time32_batches(
+ Scenario::Time32Millisecond,
+ vec![3600000, 3600001, 3600002, 3600003],
+ ),
+ make_time32_batches(
+ Scenario::Time32Millisecond,
+ vec![3600004, 3600005, 3600006, 3600007],
+ ),
+ make_time32_batches(
+ Scenario::Time32Millisecond,
+ vec![3600008, 3600009, 3600010, 3600011],
+ ),
+ make_time32_batches(
+ Scenario::Time32Millisecond,
+ vec![3600012, 3600013, 3600014, 3600015],
+ ),
+ ]
+ }
+ Scenario::Time64Microsecond => {
+ vec![
+ make_time64_batches(
+ Scenario::Time64Microsecond,
+ vec![1234567890123, 1234567890124, 1234567890125,
1234567890126],
+ ),
+ make_time64_batches(
+ Scenario::Time64Microsecond,
+ vec![1234567890127, 1234567890128, 1234567890129,
1234567890130],
+ ),
+ make_time64_batches(
+ Scenario::Time64Microsecond,
+ vec![1234567890131, 1234567890132, 1234567890133,
1234567890134],
+ ),
+ make_time64_batches(
+ Scenario::Time64Microsecond,
+ vec![1234567890135, 1234567890136, 1234567890137,
1234567890138],
+ ),
+ ]
+ }
+ Scenario::Time64Nanosecond => {
+ vec![
+ make_time64_batches(
+ Scenario::Time64Nanosecond,
+ vec![
+ 987654321012345,
+ 987654321012346,
+ 987654321012347,
+ 987654321012348,
+ ],
+ ),
+ make_time64_batches(
+ Scenario::Time64Nanosecond,
+ vec![
+ 987654321012349,
+ 987654321012350,
+ 987654321012351,
+ 987654321012352,
+ ],
+ ),
+ make_time64_batches(
+ Scenario::Time64Nanosecond,
+ vec![
+ 987654321012353,
+ 987654321012354,
+ 987654321012355,
+ 987654321012356,
+ ],
+ ),
+ make_time64_batches(
+ Scenario::Time64Nanosecond,
+ vec![
+ 987654321012357,
+ 987654321012358,
+ 987654321012359,
+ 987654321012360,
+ ],
+ ),
+ ]
+ }
Scenario::UTF8 => {
vec![
make_utf8_batch(vec![Some("a"), Some("b"), Some("c"),
Some("d"), None]),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]