This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0f1999072 test: add test for decimal and pruning for decimal column
(#2960)
0f1999072 is described below
commit 0f1999072e803d6f9f0a7c2b0cfa538d225a9eef
Author: Kun Liu <[email protected]>
AuthorDate: Wed Jul 27 03:54:13 2022 +0800
test: add test for decimal and pruning for decimal column (#2960)
* add read decimal parquet test and prune test for decimal
* Update datafusion/core/src/datasource/file_format/parquet.rs
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 48 +++++++++-
datafusion/core/src/physical_optimizer/pruning.rs | 102 +++++++++++++++++++++
2 files changed, 148 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index d23602c4b..abff811c2 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -525,8 +525,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
- ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
Int32Array,
- StringArray, TimestampNanosecondArray,
+ Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
+ Int32Array, StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@@ -1023,6 +1023,50 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn read_decimal_parquet() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ // parquet use the int32 as the physical type to store decimal
+ let exec = get_exec("int32_decimal.parquet", None, None).await?;
+ let batches = collect(exec, task_ctx.clone()).await?;
+ assert_eq!(1, batches.len());
+ assert_eq!(1, batches[0].num_columns());
+ let column = batches[0].column(0);
+ assert_eq!(&DataType::Decimal(4, 2), column.data_type());
+
+ // parquet use the int64 as the physical type to store decimal
+ let exec = get_exec("int64_decimal.parquet", None, None).await?;
+ let batches = collect(exec, task_ctx.clone()).await?;
+ assert_eq!(1, batches.len());
+ assert_eq!(1, batches[0].num_columns());
+ let column = batches[0].column(0);
+ assert_eq!(&DataType::Decimal(10, 2), column.data_type());
+
+ // parquet use the fixed length binary as the physical type to store
decimal
+ let exec = get_exec("fixed_length_decimal.parquet", None, None).await?;
+ let batches = collect(exec, task_ctx.clone()).await?;
+ assert_eq!(1, batches.len());
+ assert_eq!(1, batches[0].num_columns());
+ let column = batches[0].column(0);
+ assert_eq!(&DataType::Decimal(25, 2), column.data_type());
+
+ let exec = get_exec("fixed_length_decimal_legacy.parquet", None,
None).await?;
+ let batches = collect(exec, task_ctx.clone()).await?;
+ assert_eq!(1, batches.len());
+ assert_eq!(1, batches[0].num_columns());
+ let column = batches[0].column(0);
+ assert_eq!(&DataType::Decimal(13, 2), column.data_type());
+
+ // parquet use the fixed length binary as the physical type to store
decimal
+ // TODO: arrow-rs don't support convert the physical type of binary to
decimal
+ // https://github.com/apache/arrow-rs/pull/2160
+ // let exec = get_exec("byte_array_decimal.parquet", None,
None).await?;
+
+ Ok(())
+ }
+
fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
.metrics()
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 4b0d04b54..2265675b3 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -800,10 +800,12 @@ mod tests {
use crate::from_slice::FromSlice;
use crate::logical_plan::{col, lit};
use crate::{assert_batches_eq,
physical_optimizer::pruning::StatisticsType};
+ use arrow::array::DecimalArray;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::{DataType, TimeUnit},
};
+ use datafusion_common::ScalarValue;
use std::collections::HashMap;
#[derive(Debug)]
@@ -814,6 +816,38 @@ mod tests {
}
impl ContainerStats {
+ fn new_decimal128(
+ min: impl IntoIterator<Item = Option<i128>>,
+ max: impl IntoIterator<Item = Option<i128>>,
+ precision: usize,
+ scale: usize,
+ ) -> Self {
+ Self {
+ min: Arc::new(
+ min.into_iter()
+ .collect::<DecimalArray>()
+ .with_precision_and_scale(precision, scale)
+ .unwrap(),
+ ),
+ max: Arc::new(
+ max.into_iter()
+ .collect::<DecimalArray>()
+ .with_precision_and_scale(precision, scale)
+ .unwrap(),
+ ),
+ }
+ }
+
+ fn new_i64(
+ min: impl IntoIterator<Item = Option<i64>>,
+ max: impl IntoIterator<Item = Option<i64>>,
+ ) -> Self {
+ Self {
+ min: Arc::new(min.into_iter().collect::<Int64Array>()),
+ max: Arc::new(max.into_iter().collect::<Int64Array>()),
+ }
+ }
+
fn new_i32(
min: impl IntoIterator<Item = Option<i32>>,
max: impl IntoIterator<Item = Option<i32>>,
@@ -1418,6 +1452,74 @@ mod tests {
Ok(())
}
+ #[test]
+ fn prune_decimal_data() {
+ // decimal(9,2)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "s1",
+ DataType::Decimal(9, 2),
+ true,
+ )]));
+ // s1 > 5
+ let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+ // If the data is written by spark, the physical data type is INT32 in
the parquet
+ // So we use the INT32 type of statistic.
+ let statistics = TestStatistics::new().with(
+ "s1",
+ ContainerStats::new_i32(
+ vec![Some(0), Some(4), None, Some(3)], // min
+ vec![Some(5), Some(6), Some(4), None], // max
+ ),
+ );
+ let p = PruningPredicate::try_new(expr, schema).unwrap();
+ let result = p.prune(&statistics).unwrap();
+ let expected = vec![false, true, false, true];
+ assert_eq!(result, expected);
+
+ // decimal(18,2)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "s1",
+ DataType::Decimal(18, 2),
+ true,
+ )]));
+ // s1 > 5
+ let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18,
2)));
+ // If the data is written by spark, the physical data type is INT64 in
the parquet
+ // So we use the INT32 type of statistic.
+ let statistics = TestStatistics::new().with(
+ "s1",
+ ContainerStats::new_i64(
+ vec![Some(0), Some(4), None, Some(3)], // min
+ vec![Some(5), Some(6), Some(4), None], // max
+ ),
+ );
+ let p = PruningPredicate::try_new(expr, schema).unwrap();
+ let result = p.prune(&statistics).unwrap();
+ let expected = vec![false, true, false, true];
+ assert_eq!(result, expected);
+
+ // decimal(23,2)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "s1",
+ DataType::Decimal(23, 2),
+ true,
+ )]));
+ // s1 > 5
+ let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23,
2)));
+ let statistics = TestStatistics::new().with(
+ "s1",
+ ContainerStats::new_decimal128(
+ vec![Some(0), Some(400), None, Some(300)], // min
+ vec![Some(500), Some(600), Some(400), None], // max
+ 23,
+ 2,
+ ),
+ );
+ let p = PruningPredicate::try_new(expr, schema).unwrap();
+ let result = p.prune(&statistics).unwrap();
+ let expected = vec![false, true, false, true];
+ assert_eq!(result, expected);
+ }
#[test]
fn prune_api() {
let schema = Arc::new(Schema::new(vec![