This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e10d3e2a02 Rewrite bloom filters to use `contains` API (#8442)
e10d3e2a02 is described below

commit e10d3e2a0267c70bf36373c6811906e5b9b47703
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Dec 26 06:53:07 2023 -0500

    Rewrite bloom filters to use `contains` API (#8442)
---
 .../src/datasource/physical_plan/parquet/mod.rs    |   1 +
 .../datasource/physical_plan/parquet/row_groups.rs | 245 ++++++++-------------
 2 files changed, 91 insertions(+), 155 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ade149da69..76a6cc297b 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -522,6 +522,7 @@ impl FileOpener for ParquetOpener {
             if enable_bloom_filter && !row_groups.is_empty() {
                 if let Some(predicate) = predicate {
                     row_groups = row_groups::prune_row_groups_by_bloom_filters(
+                        &file_schema,
                         &mut builder,
                         &row_groups,
                         file_metadata.row_groups(),
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 09e4907c94..8a1abb7d96 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -18,8 +18,7 @@
 use arrow::{array::ArrayRef, datatypes::Schema};
 use arrow_array::BooleanArray;
 use arrow_schema::FieldRef;
-use datafusion_common::tree_node::{TreeNode, VisitRecursion};
-use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
+use datafusion_common::{Column, ScalarValue};
 use parquet::file::metadata::ColumnChunkMetaData;
 use parquet::schema::types::SchemaDescriptor;
 use parquet::{
@@ -27,19 +26,13 @@ use parquet::{
     bloom_filter::Sbbf,
     file::metadata::RowGroupMetaData,
 };
-use std::{
-    collections::{HashMap, HashSet},
-    sync::Arc,
-};
+use std::collections::{HashMap, HashSet};
 
 use crate::datasource::listing::FileRange;
 use crate::datasource::physical_plan::parquet::statistics::{
     max_statistics, min_statistics, parquet_column,
 };
-use crate::logical_expr::Operator;
-use crate::physical_expr::expressions as phys_expr;
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
-use crate::physical_plan::PhysicalExpr;
 
 use super::ParquetFileMetrics;
 
@@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics(
 pub(crate) async fn prune_row_groups_by_bloom_filters<
     T: AsyncFileReader + Send + 'static,
 >(
+    arrow_schema: &Schema,
     builder: &mut ParquetRecordBatchStreamBuilder<T>,
     row_groups: &[usize],
     groups: &[RowGroupMetaData],
     predicate: &PruningPredicate,
     metrics: &ParquetFileMetrics,
 ) -> Vec<usize> {
-    let bf_predicates = match 
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
-    {
-        Ok(predicates) => predicates,
-        Err(_) => {
-            return row_groups.to_vec();
-        }
-    };
     let mut filtered = Vec::with_capacity(groups.len());
     for idx in row_groups {
-        let rg_metadata = &groups[*idx];
-        // get all columns bloom filter
-        let mut column_sbbf =
-            HashMap::with_capacity(bf_predicates.required_columns.len());
-        for column_name in bf_predicates.required_columns.iter() {
-            let column_idx = match rg_metadata
-                .columns()
-                .iter()
-                .enumerate()
-                .find(|(_, column)| 
column.column_path().string().eq(column_name))
-            {
-                Some((column_idx, _)) => column_idx,
-                None => continue,
+        // get all columns in the predicate that we could use a bloom filter 
with
+        let literal_columns = predicate.literal_columns();
+        let mut column_sbbf = HashMap::with_capacity(literal_columns.len());
+
+        for column_name in literal_columns {
+            let Some((column_idx, _field)) =
+                parquet_column(builder.parquet_schema(), arrow_schema, 
&column_name)
+            else {
+                continue;
             };
+
             let bf = match builder
                 .get_row_group_column_bloom_filter(*idx, column_idx)
                 .await
             {
-                Ok(bf) => match bf {
-                    Some(bf) => bf,
-                    None => {
-                        continue;
-                    }
-                },
+                Ok(Some(bf)) => bf,
+                Ok(None) => continue, // no bloom filter for this column
                 Err(e) => {
-                    log::error!("Error evaluating row group predicate values 
when using BloomFilterPruningPredicate {e}");
+                    log::debug!("Ignoring error reading bloom filter: {e}");
                     metrics.predicate_evaluation_errors.add(1);
                     continue;
                 }
             };
-            column_sbbf.insert(column_name.to_owned(), bf);
+            column_sbbf.insert(column_name.to_string(), bf);
         }
-        if bf_predicates.prune(&column_sbbf) {
+
+        let stats = BloomFilterStatistics { column_sbbf };
+
+        // Can this group be pruned?
+        let prune_group = match predicate.prune(&stats) {
+            Ok(values) => !values[0],
+            Err(e) => {
+                log::debug!("Error evaluating row group predicate on bloom 
filter: {e}");
+                metrics.predicate_evaluation_errors.add(1);
+                false
+            }
+        };
+
+        if prune_group {
             metrics.row_groups_pruned.add(1);
-            continue;
+        } else {
+            filtered.push(*idx);
         }
-        filtered.push(*idx);
     }
     filtered
 }
 
-struct BloomFilterPruningPredicate {
-    /// Actual pruning predicate
-    predicate_expr: Option<phys_expr::BinaryExpr>,
-    /// The statistics required to evaluate this predicate
-    required_columns: Vec<String>,
+/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
+struct BloomFilterStatistics {
+    /// Maps column name to the parquet bloom filter
+    column_sbbf: HashMap<String, Sbbf>,
 }
 
-impl BloomFilterPruningPredicate {
-    fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
-        let binary_expr = 
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
-        match binary_expr {
-            Some(binary_expr) => {
-                let columns = Self::get_predicate_columns(expr);
-                Ok(Self {
-                    predicate_expr: Some(binary_expr.clone()),
-                    required_columns: columns.into_iter().collect(),
-                })
-            }
-            None => Err(DataFusionError::Execution(
-                "BloomFilterPruningPredicate only support binary 
expr".to_string(),
-            )),
-        }
+impl PruningStatistics for BloomFilterStatistics {
+    fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
+        None
     }
 
-    fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
-        Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), 
column_sbbf)
+    fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
+        None
     }
 
-    /// Return true if the `expr` can be proved not `true`
-    /// based on the bloom filter.
-    ///
-    /// We only checked `BinaryExpr` but it also support `InList`,
-    /// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
-    fn prune_expr_with_bloom_filter(
-        expr: Option<&phys_expr::BinaryExpr>,
-        column_sbbf: &HashMap<String, Sbbf>,
-    ) -> bool {
-        let Some(expr) = expr else {
-            // unsupported predicate
-            return false;
-        };
-        match expr.op() {
-            Operator::And | Operator::Or => {
-                let left = Self::prune_expr_with_bloom_filter(
-                    
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
-                    column_sbbf,
-                );
-                let right = Self::prune_expr_with_bloom_filter(
-                    expr.right()
-                        .as_any()
-                        .downcast_ref::<phys_expr::BinaryExpr>(),
-                    column_sbbf,
-                );
-                match expr.op() {
-                    Operator::And => left || right,
-                    Operator::Or => left && right,
-                    _ => false,
-                }
-            }
-            Operator::Eq => {
-                if let Some((col, val)) = 
Self::check_expr_is_col_equal_const(expr) {
-                    if let Some(sbbf) = column_sbbf.get(col.name()) {
-                        match val {
-                            ScalarValue::Utf8(Some(v)) => 
!sbbf.check(&v.as_str()),
-                            ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
-                            ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
-                            _ => false,
-                        }
-                    } else {
-                        false
-                    }
-                } else {
-                    false
-                }
-            }
-            _ => false,
-        }
+    fn num_containers(&self) -> usize {
+        1
     }
 
-    fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
-        let mut columns = HashSet::new();
-        expr.apply(&mut |expr| {
-            if let Some(binary_expr) =
-                expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
-            {
-                if let Some((column, _)) =
-                    Self::check_expr_is_col_equal_const(binary_expr)
-                {
-                    columns.insert(column.name().to_string());
-                }
-            }
-            Ok(VisitRecursion::Continue)
-        })
-        // no way to fail as only Ok(VisitRecursion::Continue) is returned
-        .unwrap();
-
-        columns
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
     }
 
-    fn check_expr_is_col_equal_const(
-        exr: &phys_expr::BinaryExpr,
-    ) -> Option<(phys_expr::Column, ScalarValue)> {
-        if Operator::Eq.ne(exr.op()) {
-            return None;
-        }
+    /// Use bloom filters to determine if we are sure this column can not
+    /// possibly contain `values`
+    ///
+    /// The `contained` API returns false if the bloom filters knows that *ALL*
+    /// of the values in a column are not present.
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        let sbbf = self.column_sbbf.get(column.name.as_str())?;
 
-        let left_any = exr.left().as_any();
-        let right_any = exr.right().as_any();
-        if let (Some(col), Some(liter)) = (
-            left_any.downcast_ref::<phys_expr::Column>(),
-            right_any.downcast_ref::<phys_expr::Literal>(),
-        ) {
-            return Some((col.clone(), liter.value().clone()));
-        }
-        if let (Some(liter), Some(col)) = (
-            left_any.downcast_ref::<phys_expr::Literal>(),
-            right_any.downcast_ref::<phys_expr::Column>(),
-        ) {
-            return Some((col.clone(), liter.value().clone()));
-        }
-        None
+        // Bloom filters are probabilistic data structures that can return 
false
+        // positives (i.e. it might return true even if the value is not
+        // present) however, the bloom filter will return `false` if the value 
is
+        // definitely not present.
+
+        let known_not_present = values
+            .iter()
+            .map(|value| match value {
+                ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
+                ScalarValue::Boolean(Some(v)) => sbbf.check(v),
+                ScalarValue::Float64(Some(v)) => sbbf.check(v),
+                ScalarValue::Float32(Some(v)) => sbbf.check(v),
+                ScalarValue::Int64(Some(v)) => sbbf.check(v),
+                ScalarValue::Int32(Some(v)) => sbbf.check(v),
+                ScalarValue::Int16(Some(v)) => sbbf.check(v),
+                ScalarValue::Int8(Some(v)) => sbbf.check(v),
+                _ => true,
+            })
+            // The row group doesn't contain any of the values if
+            // all the checks are false
+            .all(|v| !v);
+
+        let contains = if known_not_present {
+            Some(false)
+        } else {
+            // Given the bloom filter is probabilistic, we can't be sure that
+            // the row group actually contains the values. Return `None` to
+            // indicate this uncertainty
+            None
+        };
+
+        Some(BooleanArray::from(vec![contains]))
     }
 }
 
@@ -1367,6 +1301,7 @@ mod tests {
 
         let metadata = builder.metadata().clone();
         let pruned_row_group = prune_row_groups_by_bloom_filters(
+            pruning_predicate.schema(),
             &mut builder,
             row_groups,
             metadata.row_groups(),

Reply via email to