This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c345f6d70 fix: support decimal statistic for row group prune (#2966)
c345f6d70 is described below

commit c345f6d707a9bf8ed3395af997e8b3495ed6de7d
Author: Kun Liu <[email protected]>
AuthorDate: Thu Jul 28 02:04:21 2022 +0800

    fix: support decimal statistic for row group prune (#2966)
    
    * support get the decimal statistics for pruning row group
    
    * address comment: change function name and modify the ut
---
 .../core/src/physical_plan/file_format/parquet.rs  | 358 ++++++++++++++++++++-
 1 file changed, 342 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 40f2e3304..d79fd2673 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,7 @@ use std::ops::Range;
 use std::sync::Arc;
 use std::{any::Any, convert::TryInto};
 
+use arrow::datatypes::DataType;
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
@@ -36,12 +37,14 @@ use log::debug;
 use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+use parquet::basic::{ConvertedType, LogicalType};
 use parquet::errors::ParquetError;
 use parquet::file::{
     metadata::{ParquetMetaData, RowGroupMetaData},
     properties::WriterProperties,
     statistics::Statistics as ParquetStatistics,
 };
+use parquet::schema::types::ColumnDescriptor;
 
 use datafusion_common::Column;
 use datafusion_expr::Expr;
@@ -394,28 +397,82 @@ struct RowGroupPruningStatistics<'a> {
     parquet_schema: &'a Schema,
 }
 
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+fn from_bytes_to_i128(b: &[u8]) -> i128 {
+    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+    let first_bit = b[0] & 128u8 == 128u8;
+    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+    for (i, v) in b.iter().enumerate() {
+        result[i + (16 - b.len())] = *v;
+    }
+    // The bytes array are from parquet file and must be the big-endian.
+    // The endian is defined by parquet format, and the reference document
+    // 
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+    i128::from_be_bytes(result)
+}
+
 /// Extract the min/max statistics from a `ParquetStatistics` object
 macro_rules! get_statistic {
-    ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{
+    ($column_statistics:expr, $func:ident, $bytes_func:ident, 
$target_arrow_type:expr) => {{
         if !$column_statistics.has_min_max_set() {
             return None;
         }
         match $column_statistics {
             ParquetStatistics::Boolean(s) => 
Some(ScalarValue::Boolean(Some(*s.$func()))),
-            ParquetStatistics::Int32(s) => 
Some(ScalarValue::Int32(Some(*s.$func()))),
-            ParquetStatistics::Int64(s) => 
Some(ScalarValue::Int64(Some(*s.$func()))),
+            ParquetStatistics::Int32(s) => {
+                match $target_arrow_type {
+                    // int32 to decimal with the precision and scale
+                    Some(DataType::Decimal(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                match $target_arrow_type {
+                    // int64 to decimal with the precision and scale
+                    Some(DataType::Decimal(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+                }
+            }
             // 96 bit ints not supported
             ParquetStatistics::Int96(_) => None,
             ParquetStatistics::Float(s) => 
Some(ScalarValue::Float32(Some(*s.$func()))),
             ParquetStatistics::Double(s) => 
Some(ScalarValue::Float64(Some(*s.$func()))),
             ParquetStatistics::ByteArray(s) => {
+                // TODO support decimal type for byte array type
                 let s = std::str::from_utf8(s.$bytes_func())
                     .map(|s| s.to_string())
                     .ok();
                 Some(ScalarValue::Utf8(s))
             }
             // type not supported yet
-            ParquetStatistics::FixedLenByteArray(_) => None,
+            ParquetStatistics::FixedLenByteArray(s) => {
+                match $target_arrow_type {
+                    // just support the decimal data type
+                    Some(DataType::Decimal(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(from_bytes_to_i128(s.$bytes_func())),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => None,
+                }
+            }
         }
     }};
 }
@@ -439,8 +496,12 @@ macro_rules! get_min_max_values {
             .columns()
             .iter()
             .find(|c| c.column_descr().name() == &$column.name)
-            .and_then(|c| c.statistics())
-            .map(|stats| get_statistic!(stats, $func, $bytes_func))
+            .and_then(|c| if c.statistics().is_some() 
{Some((c.statistics().unwrap(), c.column_descr()))} else {None})
+            .map(|(stats, column_descr)|
+                {
+                    let target_data_type = 
parquet_to_arrow_decimal_type(column_descr);
+                    get_statistic!(stats, $func, $bytes_func, target_data_type)
+                })
             .flatten()
             // column either didn't have statistics at all or didn't have 
min/max values
             .or_else(|| Some(null_scalar.clone()))
@@ -468,6 +529,24 @@ macro_rules! get_null_count_values {
     }};
 }
 
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> 
Option<DataType> {
+    let type_ptr = parquet_column.self_type_ptr();
+    match type_ptr.get_basic_info().logical_type() {
+        Some(LogicalType::Decimal { scale, precision }) => {
+            Some(DataType::Decimal(precision as usize, scale as usize))
+        }
+        _ => match type_ptr.get_basic_info().converted_type() {
+            ConvertedType::DECIMAL => Some(DataType::Decimal(
+                type_ptr.get_precision() as usize,
+                type_ptr.get_scale() as usize,
+            )),
+            _ => None,
+        },
+    }
+}
+
 impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     fn min_values(&self, column: &Column) -> Option<ArrayRef> {
         get_min_max_values!(self, column, min, min_bytes)
@@ -600,6 +679,8 @@ mod tests {
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::ObjectMeta;
+    use parquet::basic::LogicalType;
+    use parquet::data_type::{ByteArray, FixedLenByteArray};
     use parquet::{
         basic::Type as PhysicalType,
         file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
@@ -1208,8 +1289,14 @@ mod tests {
         let schema = Schema::new(vec![Field::new("c1", DataType::Int32, 
false)]);
         let pruning_predicate =
             PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-
-        let schema_descr = get_test_schema_descr(vec![("c1", 
PhysicalType::INT32)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            None,
+            None,
+            None,
+            None,
+        )]);
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
             vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
@@ -1235,7 +1322,14 @@ mod tests {
         let pruning_predicate =
             PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
 
-        let schema_descr = get_test_schema_descr(vec![("c1", 
PhysicalType::INT32)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            None,
+            None,
+            None,
+            None,
+        )]);
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
             vec![ParquetStatistics::int32(None, None, None, 0, false)],
@@ -1266,8 +1360,8 @@ mod tests {
         let pruning_predicate = PruningPredicate::try_new(expr, 
schema.clone()).unwrap();
 
         let schema_descr = get_test_schema_descr(vec![
-            ("c1", PhysicalType::INT32),
-            ("c2", PhysicalType::INT32),
+            ("c1", PhysicalType::INT32, None, None, None, None),
+            ("c2", PhysicalType::INT32, None, None, None, None),
         ]);
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
@@ -1308,8 +1402,8 @@ mod tests {
 
     fn gen_row_group_meta_data_for_pruning_predicate() -> 
Vec<RowGroupMetaData> {
         let schema_descr = get_test_schema_descr(vec![
-            ("c1", PhysicalType::INT32),
-            ("c2", PhysicalType::BOOLEAN),
+            ("c1", PhysicalType::INT32, None, None, None, None),
+            ("c2", PhysicalType::BOOLEAN, None, None, None, None),
         ]);
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
@@ -1373,6 +1467,202 @@ mod tests {
         );
     }
 
+    #[test]
+    fn row_group_pruning_predicate_decimal_type() {
+        // For the decimal data type, parquet can use `INT32`, `INT64`, 
`BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+        // store the data.
+        // In this case, construct four types of statistics to filtered with 
the decimal predication.
+
+        // INT32: c1 > 5, the c1 is decimal(9,2)
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 
2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 9,
+            }),
+            Some(9),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [1.00, 6.00]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), 
&metrics),
+            vec![0]
+        );
+
+        // INT32: c1 > 5, but parquet decimal type has different precision or 
scale to arrow decimal
+        // The decimal of arrow is decimal(5,2), the decimal of parquet is 
decimal(9,0)
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 5, 2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(5, 
2), false)]);
+        // The decimal of parquet is decimal(9,0)
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 0,
+                precision: 9,
+            }),
+            Some(9),
+            Some(0),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [100, 600]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [10, 20]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let rgm3 = get_row_group_meta_data(
+            &schema_descr,
+            // [0, 2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(
+                &[rgm1, rgm2, rgm3],
+                None,
+                Some(pruning_predicate),
+                &metrics
+            ),
+            vec![0, 1]
+        );
+
+        // INT64: c1 < 5, the c1 is decimal(18,2)
+        let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 
2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 
2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT64,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 18,
+            }),
+            Some(18),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [6.00, 8.00]
+            vec![ParquetStatistics::int32(
+                Some(600),
+                Some(800),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), 
&metrics),
+            vec![1]
+        );
+
+        // FIXED_LENGTH_BYTE_ARRAY: c1 = 100, the c1 is decimal(28,2)
+        // the type of parquet is decimal(18,2)
+        let expr = col("c1").eq(lit(ScalarValue::Decimal128(Some(100000), 28, 
3)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 
3), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::FIXED_LEN_BYTE_ARRAY,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 18,
+            }),
+            Some(18),
+            Some(2),
+            Some(16),
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        // we must use the big-endian when encode the i128 to bytes or vec[u8].
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::fixed_len_byte_array(
+                // 5.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    500i128.to_be_bytes().to_vec(),
+                ))),
+                // 80.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    8000i128.to_be_bytes().to_vec(),
+                ))),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::fixed_len_byte_array(
+                // 5.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    500i128.to_be_bytes().to_vec(),
+                ))),
+                // 200.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    20000i128.to_be_bytes().to_vec(),
+                ))),
+                None,
+                0,
+                false,
+            )],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), 
&metrics),
+            vec![1]
+        );
+
+        // TODO: BYTE_ARRAY support read decimal from parquet, after the 
20.0.0 arrow-rs release
+    }
+
     fn get_row_group_meta_data(
         schema_descr: &SchemaDescPtr,
         column_statistics: Vec<ParquetStatistics>,
@@ -1394,12 +1684,48 @@ mod tests {
             .unwrap()
     }
 
-    fn get_test_schema_descr(fields: Vec<(&str, PhysicalType)>) -> 
SchemaDescPtr {
+    #[allow(clippy::type_complexity)]
+    fn get_test_schema_descr(
+        fields: Vec<(
+            &str,
+            PhysicalType,
+            Option<LogicalType>,
+            Option<i32>, // precision
+            Option<i32>, // scale
+            Option<i32>, // length of bytes
+        )>,
+    ) -> SchemaDescPtr {
         use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
         let mut schema_fields = fields
             .iter()
-            .map(|(n, t)| {
-                Arc::new(SchemaType::primitive_type_builder(n, 
*t).build().unwrap())
+            .map(|(n, t, logical, precision, scale, length)| {
+                let mut builder = SchemaType::primitive_type_builder(n, *t);
+                // add logical type for the parquet field
+                match logical {
+                    None => {}
+                    Some(logical_type) => {
+                        builder = 
builder.with_logical_type(Some(logical_type.clone()));
+                    }
+                };
+                match precision {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_precision(*v);
+                    }
+                };
+                match scale {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_scale(*v);
+                    }
+                }
+                match length {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_length(*v);
+                    }
+                }
+                Arc::new(builder.build().unwrap())
             })
             .collect::<Vec<_>>();
         let schema = SchemaType::group_type_builder("schema")

Reply via email to