This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c345f6d70 fix: support decimal statistic for row group prune (#2966)
c345f6d70 is described below
commit c345f6d707a9bf8ed3395af997e8b3495ed6de7d
Author: Kun Liu <[email protected]>
AuthorDate: Thu Jul 28 02:04:21 2022 +0800
fix: support decimal statistic for row group prune (#2966)
* support get the decimal statistics for pruning row group
* address comment: change function name and modify the ut
---
.../core/src/physical_plan/file_format/parquet.rs | 358 ++++++++++++++++++++-
1 file changed, 342 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 40f2e3304..d79fd2673 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -24,6 +24,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::{any::Any, convert::TryInto};
+use arrow::datatypes::DataType;
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
@@ -36,12 +37,14 @@ use log::debug;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
+use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
use parquet::file::{
metadata::{ParquetMetaData, RowGroupMetaData},
properties::WriterProperties,
statistics::Statistics as ParquetStatistics,
};
+use parquet::schema::types::ColumnDescriptor;
use datafusion_common::Column;
use datafusion_expr::Expr;
@@ -394,28 +397,82 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+fn from_bytes_to_i128(b: &[u8]) -> i128 {
+ assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+ let first_bit = b[0] & 128u8 == 128u8;
+ let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+ for (i, v) in b.iter().enumerate() {
+ result[i + (16 - b.len())] = *v;
+ }
+ // The bytes array are from parquet file and must be the big-endian.
+ // The endian is defined by parquet format, and the reference document
+ //
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+ i128::from_be_bytes(result)
+}
+
/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
- ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{
+ ($column_statistics:expr, $func:ident, $bytes_func:ident,
$target_arrow_type:expr) => {{
if !$column_statistics.has_min_max_set() {
return None;
}
match $column_statistics {
ParquetStatistics::Boolean(s) =>
Some(ScalarValue::Boolean(Some(*s.$func()))),
- ParquetStatistics::Int32(s) =>
Some(ScalarValue::Int32(Some(*s.$func()))),
- ParquetStatistics::Int64(s) =>
Some(ScalarValue::Int64(Some(*s.$func()))),
+ ParquetStatistics::Int32(s) => {
+ match $target_arrow_type {
+ // int32 to decimal with the precision and scale
+ Some(DataType::Decimal(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ precision,
+ scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+ }
+ }
+ ParquetStatistics::Int64(s) => {
+ match $target_arrow_type {
+ // int64 to decimal with the precision and scale
+ Some(DataType::Decimal(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ precision,
+ scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+ }
+ }
// 96 bit ints not supported
ParquetStatistics::Int96(_) => None,
ParquetStatistics::Float(s) =>
Some(ScalarValue::Float32(Some(*s.$func()))),
ParquetStatistics::Double(s) =>
Some(ScalarValue::Float64(Some(*s.$func()))),
ParquetStatistics::ByteArray(s) => {
+ // TODO support decimal type for byte array type
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
Some(ScalarValue::Utf8(s))
}
// type not supported yet
- ParquetStatistics::FixedLenByteArray(_) => None,
+ ParquetStatistics::FixedLenByteArray(s) => {
+ match $target_arrow_type {
+ // just support the decimal data type
+ Some(DataType::Decimal(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(from_bytes_to_i128(s.$bytes_func())),
+ precision,
+ scale,
+ ))
+ }
+ _ => None,
+ }
+ }
}
}};
}
@@ -439,8 +496,12 @@ macro_rules! get_min_max_values {
.columns()
.iter()
.find(|c| c.column_descr().name() == &$column.name)
- .and_then(|c| c.statistics())
- .map(|stats| get_statistic!(stats, $func, $bytes_func))
+ .and_then(|c| if c.statistics().is_some()
{Some((c.statistics().unwrap(), c.column_descr()))} else {None})
+ .map(|(stats, column_descr)|
+ {
+ let target_data_type =
parquet_to_arrow_decimal_type(column_descr);
+ get_statistic!(stats, $func, $bytes_func, target_data_type)
+ })
.flatten()
// column either didn't have statistics at all or didn't have
min/max values
.or_else(|| Some(null_scalar.clone()))
@@ -468,6 +529,24 @@ macro_rules! get_null_count_values {
}};
}
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) ->
Option<DataType> {
+ let type_ptr = parquet_column.self_type_ptr();
+ match type_ptr.get_basic_info().logical_type() {
+ Some(LogicalType::Decimal { scale, precision }) => {
+ Some(DataType::Decimal(precision as usize, scale as usize))
+ }
+ _ => match type_ptr.get_basic_info().converted_type() {
+ ConvertedType::DECIMAL => Some(DataType::Decimal(
+ type_ptr.get_precision() as usize,
+ type_ptr.get_scale() as usize,
+ )),
+ _ => None,
+ },
+ }
+}
+
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
@@ -600,6 +679,8 @@ mod tests {
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectMeta;
+ use parquet::basic::LogicalType;
+ use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::{
basic::Type as PhysicalType,
file::{metadata::RowGroupMetaData, statistics::Statistics as
ParquetStatistics},
@@ -1208,8 +1289,14 @@ mod tests {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-
- let schema_descr = get_test_schema_descr(vec![("c1",
PhysicalType::INT32)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ None,
+ None,
+ None,
+ None,
+ )]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
@@ -1235,7 +1322,14 @@ mod tests {
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- let schema_descr = get_test_schema_descr(vec![("c1",
PhysicalType::INT32)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ None,
+ None,
+ None,
+ None,
+ )]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(None, None, None, 0, false)],
@@ -1266,8 +1360,8 @@ mod tests {
let pruning_predicate = PruningPredicate::try_new(expr,
schema.clone()).unwrap();
let schema_descr = get_test_schema_descr(vec![
- ("c1", PhysicalType::INT32),
- ("c2", PhysicalType::INT32),
+ ("c1", PhysicalType::INT32, None, None, None, None),
+ ("c2", PhysicalType::INT32, None, None, None, None),
]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
@@ -1308,8 +1402,8 @@ mod tests {
fn gen_row_group_meta_data_for_pruning_predicate() ->
Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
- ("c1", PhysicalType::INT32),
- ("c2", PhysicalType::BOOLEAN),
+ ("c1", PhysicalType::INT32, None, None, None, None),
+ ("c2", PhysicalType::BOOLEAN, None, None, None, None),
]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
@@ -1373,6 +1467,202 @@ mod tests {
);
}
+ #[test]
+ fn row_group_pruning_predicate_decimal_type() {
+ // For the decimal data type, parquet can use `INT32`, `INT64`,
`BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+ // store the data.
+ // In this case, construct four types of statistics to filtered with
the decimal predication.
+
+ // INT32: c1 > 5, the c1 is decimal(9,2)
+ let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9,
2), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 9,
+ }),
+ Some(9),
+ Some(2),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [1.00, 6.00]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(
+ Some(100),
+ Some(600),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [0.1, 0.2]
+ // c1 > 5, this row group will not be included in the results.
+ vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate),
&metrics),
+ vec![0]
+ );
+
+ // INT32: c1 > 5, but parquet decimal type has different precision or
scale to arrow decimal
+ // The decimal of arrow is decimal(5,2), the decimal of parquet is
decimal(9,0)
+ let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 5, 2)));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(5,
2), false)]);
+ // The decimal of parquet is decimal(9,0)
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ Some(LogicalType::Decimal {
+ scale: 0,
+ precision: 9,
+ }),
+ Some(9),
+ Some(0),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [100, 600]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(
+ Some(100),
+ Some(600),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [10, 20]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+ );
+ let rgm3 = get_row_group_meta_data(
+ &schema_descr,
+ // [0, 2]
+ // c1 > 5, this row group will not be included in the results.
+ vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(
+ &[rgm1, rgm2, rgm3],
+ None,
+ Some(pruning_predicate),
+ &metrics
+ ),
+ vec![0, 1]
+ );
+
+ // INT64: c1 < 5, the c1 is decimal(18,2)
+ let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18,
2)));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18,
2), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT64,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 18,
+ }),
+ Some(18),
+ Some(2),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [6.00, 8.00]
+ vec![ParquetStatistics::int32(
+ Some(600),
+ Some(800),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [0.1, 0.2]
+ vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate),
&metrics),
+ vec![1]
+ );
+
+ // FIXED_LENGTH_BYTE_ARRAY: c1 = 100, the c1 is decimal(28,2)
+ // the type of parquet is decimal(18,2)
+ let expr = col("c1").eq(lit(ScalarValue::Decimal128(Some(100000), 28,
3)));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18,
3), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::FIXED_LEN_BYTE_ARRAY,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 18,
+ }),
+ Some(18),
+ Some(2),
+ Some(16),
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ // we must use the big-endian when encode the i128 to bytes or vec[u8].
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::fixed_len_byte_array(
+ // 5.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 500i128.to_be_bytes().to_vec(),
+ ))),
+ // 80.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 8000i128.to_be_bytes().to_vec(),
+ ))),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::fixed_len_byte_array(
+ // 5.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 500i128.to_be_bytes().to_vec(),
+ ))),
+ // 200.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 20000i128.to_be_bytes().to_vec(),
+ ))),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate),
&metrics),
+ vec![1]
+ );
+
+ // TODO: BYTE_ARRAY support read decimal from parquet, after the
20.0.0 arrow-rs release
+ }
+
fn get_row_group_meta_data(
schema_descr: &SchemaDescPtr,
column_statistics: Vec<ParquetStatistics>,
@@ -1394,12 +1684,48 @@ mod tests {
.unwrap()
}
- fn get_test_schema_descr(fields: Vec<(&str, PhysicalType)>) ->
SchemaDescPtr {
+ #[allow(clippy::type_complexity)]
+ fn get_test_schema_descr(
+ fields: Vec<(
+ &str,
+ PhysicalType,
+ Option<LogicalType>,
+ Option<i32>, // precision
+ Option<i32>, // scale
+ Option<i32>, // length of bytes
+ )>,
+ ) -> SchemaDescPtr {
use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
let mut schema_fields = fields
.iter()
- .map(|(n, t)| {
- Arc::new(SchemaType::primitive_type_builder(n,
*t).build().unwrap())
+ .map(|(n, t, logical, precision, scale, length)| {
+ let mut builder = SchemaType::primitive_type_builder(n, *t);
+ // add logical type for the parquet field
+ match logical {
+ None => {}
+ Some(logical_type) => {
+ builder =
builder.with_logical_type(Some(logical_type.clone()));
+ }
+ };
+ match precision {
+ None => {}
+ Some(v) => {
+ builder = builder.with_precision(*v);
+ }
+ };
+ match scale {
+ None => {}
+ Some(v) => {
+ builder = builder.with_scale(*v);
+ }
+ }
+ match length {
+ None => {}
+ Some(v) => {
+ builder = builder.with_length(*v);
+ }
+ }
+ Arc::new(builder.build().unwrap())
})
.collect::<Vec<_>>();
let schema = SchemaType::group_type_builder("schema")