This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ad8d552b9f parquet: Add support for row group pruning on 
FixedSizeBinary  (#9646)
ad8d552b9f is described below

commit ad8d552b9f150c3c066b0764e84f72b667a649ff
Author: Val Lorentz <[email protected]>
AuthorDate: Tue Mar 19 22:09:20 2024 +0100

    parquet: Add support for row group pruning on FixedSizeBinary  (#9646)
    
    * Add support for row group pruning on FixedSizeBinary
    
    * Check statistics values are valid for their type
---
 .../datasource/physical_plan/parquet/row_groups.rs |   1 +
 .../datasource/physical_plan/parquet/statistics.rs |  27 +++++-
 datafusion/core/tests/parquet/row_group_pruning.rs | 101 +++++++++++++++++++++
 3 files changed, 127 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 9cd4699496..a82c5d97a2 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -226,6 +226,7 @@ impl PruningStatistics for BloomFilterStatistics {
                 match value {
                     ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
                     ScalarValue::Binary(Some(v)) => sbbf.check(v),
+                    ScalarValue::FixedSizeBinary(_size, Some(v)) => 
sbbf.check(v),
                     ScalarValue::Boolean(Some(v)) => sbbf.check(v),
                     ScalarValue::Float64(Some(v)) => sbbf.check(v),
                     ScalarValue::Float32(Some(v)) => sbbf.check(v),
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 4e472606da..aac5aff80f 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -105,14 +105,20 @@ macro_rules! get_statistic {
                         let s = std::str::from_utf8(s.$bytes_func())
                             .map(|s| s.to_string())
                             .ok();
+                        if s.is_none() {
+                            log::debug!(
+                                "Utf8 statistics is a non-UTF8 value, ignoring 
it."
+                            );
+                        }
                         Some(ScalarValue::Utf8(s))
                     }
                 }
             }
-            // type not supported yet
+            // type not fully supported yet
             ParquetStatistics::FixedLenByteArray(s) => {
                 match $target_arrow_type {
-                    // just support the decimal data type
+                    // just support specific logical data types, there are 
others each
+                    // with their own ordering
                     Some(DataType::Decimal128(precision, scale)) => {
                         Some(ScalarValue::Decimal128(
                             Some(from_bytes_to_i128(s.$bytes_func())),
@@ -120,6 +126,23 @@ macro_rules! get_statistic {
                             *scale,
                         ))
                     }
+                    Some(DataType::FixedSizeBinary(size)) => {
+                        let value = s.$bytes_func().to_vec();
+                        let value = if value.len().try_into() == Ok(*size) {
+                            Some(value)
+                        } else {
+                            log::debug!(
+                                "FixedSizeBinary({}) statistics is a binary of 
size {}, ignoring it.",
+                                size,
+                                value.len(),
+                            );
+                            None
+                        };
+                        Some(ScalarValue::FixedSizeBinary(
+                            *size,
+                            value,
+                        ))
+                    }
                     _ => None,
                 }
             }
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs 
b/datafusion/core/tests/parquet/row_group_pruning.rs
index 5511219350..ed48d04064 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -948,6 +948,107 @@ async fn prune_binary_lt() {
         .await;
 }
 
+#[tokio::test]
+async fn prune_fixedsizebinary_eq_match() {
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = 
ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7'
+        .with_matched_by_stats(Some(2))
+        .with_pruned_by_stats(Some(1))
+        .with_matched_by_bloom_filter(Some(1))
+        .with_pruned_by_bloom_filter(Some(1))
+        .with_expected_rows(1)
+        .test_row_group_prune()
+        .await;
+
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = 
ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7'
+        .with_matched_by_stats(Some(2))
+        .with_pruned_by_stats(Some(1))
+        .with_matched_by_bloom_filter(Some(1))
+        .with_pruned_by_bloom_filter(Some(1))
+        .with_expected_rows(1)
+        .test_row_group_prune()
+        .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_eq_no_match() {
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = 
ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        // false positive on 'mixed' batch: 'be1' < 'be9' < 'fe4'
+        .with_matched_by_stats(Some(1))
+        .with_pruned_by_stats(Some(2))
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(1))
+        .with_expected_rows(0)
+        .test_row_group_prune()
+        .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_neq() {
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize != 
ARROW_CAST(CAST('be1' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(3))
+        .with_pruned_by_stats(Some(0))
+        .with_matched_by_bloom_filter(Some(3))
+        .with_pruned_by_bloom_filter(Some(0))
+        .with_expected_rows(14)
+        .test_row_group_prune()
+        .await;
+}
+
+#[tokio::test]
+async fn prune_fixedsizebinary_lt() {
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize < 
ARROW_CAST(CAST('be3' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        // matches 'all backends' only
+        .with_matched_by_stats(Some(1))
+        .with_pruned_by_stats(Some(2))
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        .with_expected_rows(2)
+        .test_row_group_prune()
+        .await;
+
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::ByteArray)
+        .with_query(
+            "SELECT name, service_fixedsize FROM t WHERE service_fixedsize < 
ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')",
+        )
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(2))
+        .with_pruned_by_stats(Some(1))
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        // all backends from 'mixed' and 'all backends'
+        .with_expected_rows(8)
+        .test_row_group_prune()
+        .await;
+}
+
 #[tokio::test]
 async fn prune_periods_in_column_names() {
     // There are three row groups for "service.name", each with 5 rows = 15 
rows total

Reply via email to