alamb commented on code in PR #2966:
URL: https://github.com/apache/arrow-datafusion/pull/2966#discussion_r930368381


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -462,6 +523,28 @@ macro_rules! get_null_count_values {
     }};
 }
 
+/// Convert parquet column schema to arrow field.
+/// copy from arrow-rs
+/// TODO: consolidate code with arrow-rs
+/// TODO: change this API public in the arrow-rs
+/// crate::schema::parquet_to_arrow_field
+fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> 
Option<DataType> {
+    let type_ptr = parquet_column.self_type_ptr();
+    match type_ptr.get_basic_info().logical_type() {
+        // just handle the decimal type

Review Comment:
   Maybe we could rename the function to `parquet_to_arrow_decimal_field` or 
something to make it more clear that this only applies to decimal types



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -388,28 +391,82 @@ struct RowGroupPruningStatistics<'a> {
     parquet_schema: &'a Schema,
 }
 
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+fn from_bytes_to_i128(b: &[u8]) -> i128 {
+    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+    let first_bit = b[0] & 128u8 == 128u8;
+    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+    for (i, v) in b.iter().enumerate() {
+        result[i + (16 - b.len())] = *v;
+    }
+    // The bytes array are from parquet file and must be the big-endian.
+    // The endian is defined by parquet format, and the reference document
+    // 
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+    i128::from_be_bytes(result)
+}
+
 /// Extract the min/max statistics from a `ParquetStatistics` object
 macro_rules! get_statistic {
-    ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{
+    ($column_statistics:expr, $func:ident, $bytes_func:ident, 
$target_arrow_type:expr) => {{
         if !$column_statistics.has_min_max_set() {
             return None;
         }
         match $column_statistics {
             ParquetStatistics::Boolean(s) => 
Some(ScalarValue::Boolean(Some(*s.$func()))),
-            ParquetStatistics::Int32(s) => 
Some(ScalarValue::Int32(Some(*s.$func()))),
-            ParquetStatistics::Int64(s) => 
Some(ScalarValue::Int64(Some(*s.$func()))),
+            ParquetStatistics::Int32(s) => {
+                match $target_arrow_type {
+                    // int32 to decimal with the precision and scale
+                    Some(DataType::Decimal(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                match $target_arrow_type {
+                    // int64 to decimal with the precision and scale
+                    Some(DataType::Decimal(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+                }
+            }
             // 96 bit ints not supported
             ParquetStatistics::Int96(_) => None,
             ParquetStatistics::Float(s) => 
Some(ScalarValue::Float32(Some(*s.$func()))),
             ParquetStatistics::Double(s) => 
Some(ScalarValue::Float64(Some(*s.$func()))),
             ParquetStatistics::ByteArray(s) => {
+                // TODO support decimal type for byte array type

Review Comment:
   Should we track this in a ticket? Or maybe even implement it in this PR?



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1367,6 +1465,202 @@ mod tests {
         );
     }
 
+    #[test]
+    fn row_group_pruning_predicate_decimal_type() {
+        // For the decimal data type, parquet can use `INT32`, `INT64`, 
`BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+        // store the data.
+        // In this case, construct four types of statistics to filtered with 
the decimal predication.
+
+        // INT32: c1 > 5, the c1 is decimal(9,2)
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 
2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 9,
+            }),
+            Some(9),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [1.00, 6.00]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), 
&metrics),
+            vec![0]
+        );
+
+        // INT32: c1 > 5, but parquet decimal type has different precision or 
scale to arrow decimal

Review Comment:
   👍 this is a good case to test
   
   Though I suggest you use something like `decimal(5,0)` where the difference 
in decimal types will result in significant errors rather than simply truncation



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -462,6 +523,28 @@ macro_rules! get_null_count_values {
     }};
 }
 
+/// Convert parquet column schema to arrow field.
+/// Copy from arrow-rs
+/// TODO: consolidate code with arrow-rs
+/// TODO: change this API public in the arrow-rs

Review Comment:
   Can you please file a ticket in arrow-rs describing the change needed and 
add a reference to the new ticket here? 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1367,6 +1465,202 @@ mod tests {
         );
     }
 
+    #[test]
+    fn row_group_pruning_predicate_decimal_type() {
+        // For the decimal data type, parquet can use `INT32`, `INT64`, 
`BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+        // store the data.
+        // In this case, construct four types of statistics to filtered with 
the decimal predication.
+
+        // INT32: c1 > 5, the c1 is decimal(9,2)
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 
2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 9,
+            }),
+            Some(9),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [1.00, 6.00]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), 
&metrics),
+            vec![0]
+        );
+
+        // INT32: c1 > 5, but parquet decimal type has different precision or 
scale to arrow decimal
+        // The decimal of arrow is decimal(9,2), the decimal of parquet is 
decimal(9,0)
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 
2), false)]);
+        // The decimal of parquet is decimal(9,0)
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 0,
+                precision: 9,
+            }),
+            Some(9),
+            Some(0),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [100, 600]

Review Comment:
   Shouldn't `100` in the parquet file mean `100` (rather than `1.00`) if the 
type is `Decimal(9, 0)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to