This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e10d3e2a02 Rewrite bloom filters to use `contains` API (#8442)
e10d3e2a02 is described below
commit e10d3e2a0267c70bf36373c6811906e5b9b47703
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Dec 26 06:53:07 2023 -0500
Rewrite bloom filters to use `contains` API (#8442)
---
.../src/datasource/physical_plan/parquet/mod.rs | 1 +
.../datasource/physical_plan/parquet/row_groups.rs | 245 ++++++++-------------
2 files changed, 91 insertions(+), 155 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ade149da69..76a6cc297b 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -522,6 +522,7 @@ impl FileOpener for ParquetOpener {
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
+ &file_schema,
&mut builder,
&row_groups,
file_metadata.row_groups(),
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 09e4907c94..8a1abb7d96 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -18,8 +18,7 @@
use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
-use datafusion_common::tree_node::{TreeNode, VisitRecursion};
-use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
+use datafusion_common::{Column, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
@@ -27,19 +26,13 @@ use parquet::{
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
+use std::collections::{HashMap, HashSet};
use crate::datasource::listing::FileRange;
use crate::datasource::physical_plan::parquet::statistics::{
max_statistics, min_statistics, parquet_column,
};
-use crate::logical_expr::Operator;
-use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
-use crate::physical_plan::PhysicalExpr;
use super::ParquetFileMetrics;
@@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics(
pub(crate) async fn prune_row_groups_by_bloom_filters<
T: AsyncFileReader + Send + 'static,
>(
+ arrow_schema: &Schema,
builder: &mut ParquetRecordBatchStreamBuilder<T>,
row_groups: &[usize],
groups: &[RowGroupMetaData],
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
- let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
- {
- Ok(predicates) => predicates,
- Err(_) => {
- return row_groups.to_vec();
- }
- };
let mut filtered = Vec::with_capacity(groups.len());
for idx in row_groups {
- let rg_metadata = &groups[*idx];
- // get all columns bloom filter
- let mut column_sbbf =
- HashMap::with_capacity(bf_predicates.required_columns.len());
- for column_name in bf_predicates.required_columns.iter() {
- let column_idx = match rg_metadata
- .columns()
- .iter()
- .enumerate()
- .find(|(_, column)|
column.column_path().string().eq(column_name))
- {
- Some((column_idx, _)) => column_idx,
- None => continue,
+ // get all columns in the predicate that we could use a bloom filter
with
+ let literal_columns = predicate.literal_columns();
+ let mut column_sbbf = HashMap::with_capacity(literal_columns.len());
+
+ for column_name in literal_columns {
+ let Some((column_idx, _field)) =
+ parquet_column(builder.parquet_schema(), arrow_schema,
&column_name)
+ else {
+ continue;
};
+
let bf = match builder
.get_row_group_column_bloom_filter(*idx, column_idx)
.await
{
- Ok(bf) => match bf {
- Some(bf) => bf,
- None => {
- continue;
- }
- },
+ Ok(Some(bf)) => bf,
+ Ok(None) => continue, // no bloom filter for this column
Err(e) => {
- log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ log::debug!("Ignoring error reading bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
continue;
}
};
- column_sbbf.insert(column_name.to_owned(), bf);
+ column_sbbf.insert(column_name.to_string(), bf);
}
- if bf_predicates.prune(&column_sbbf) {
+
+ let stats = BloomFilterStatistics { column_sbbf };
+
+ // Can this group be pruned?
+ let prune_group = match predicate.prune(&stats) {
+ Ok(values) => !values[0],
+ Err(e) => {
+ log::debug!("Error evaluating row group predicate on bloom
filter: {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ false
+ }
+ };
+
+ if prune_group {
metrics.row_groups_pruned.add(1);
- continue;
+ } else {
+ filtered.push(*idx);
}
- filtered.push(*idx);
}
filtered
}
-struct BloomFilterPruningPredicate {
- /// Actual pruning predicate
- predicate_expr: Option<phys_expr::BinaryExpr>,
- /// The statistics required to evaluate this predicate
- required_columns: Vec<String>,
+/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
+struct BloomFilterStatistics {
+ /// Maps column name to the parquet bloom filter
+ column_sbbf: HashMap<String, Sbbf>,
}
-impl BloomFilterPruningPredicate {
- fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
- let binary_expr =
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
- match binary_expr {
- Some(binary_expr) => {
- let columns = Self::get_predicate_columns(expr);
- Ok(Self {
- predicate_expr: Some(binary_expr.clone()),
- required_columns: columns.into_iter().collect(),
- })
- }
- None => Err(DataFusionError::Execution(
- "BloomFilterPruningPredicate only support binary
expr".to_string(),
- )),
- }
+impl PruningStatistics for BloomFilterStatistics {
+ fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
+ None
}
- fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
- Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(),
column_sbbf)
+ fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
+ None
}
- /// Return true if the `expr` can be proved not `true`
- /// based on the bloom filter.
- ///
- /// We only checked `BinaryExpr` but it also support `InList`,
- /// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
- fn prune_expr_with_bloom_filter(
- expr: Option<&phys_expr::BinaryExpr>,
- column_sbbf: &HashMap<String, Sbbf>,
- ) -> bool {
- let Some(expr) = expr else {
- // unsupported predicate
- return false;
- };
- match expr.op() {
- Operator::And | Operator::Or => {
- let left = Self::prune_expr_with_bloom_filter(
-
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
- column_sbbf,
- );
- let right = Self::prune_expr_with_bloom_filter(
- expr.right()
- .as_any()
- .downcast_ref::<phys_expr::BinaryExpr>(),
- column_sbbf,
- );
- match expr.op() {
- Operator::And => left || right,
- Operator::Or => left && right,
- _ => false,
- }
- }
- Operator::Eq => {
- if let Some((col, val)) =
Self::check_expr_is_col_equal_const(expr) {
- if let Some(sbbf) = column_sbbf.get(col.name()) {
- match val {
- ScalarValue::Utf8(Some(v)) =>
!sbbf.check(&v.as_str()),
- ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
- ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
- ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
- ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
- ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
- ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
- ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
- _ => false,
- }
- } else {
- false
- }
- } else {
- false
- }
- }
- _ => false,
- }
+ fn num_containers(&self) -> usize {
+ 1
}
- fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
- let mut columns = HashSet::new();
- expr.apply(&mut |expr| {
- if let Some(binary_expr) =
- expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
- {
- if let Some((column, _)) =
- Self::check_expr_is_col_equal_const(binary_expr)
- {
- columns.insert(column.name().to_string());
- }
- }
- Ok(VisitRecursion::Continue)
- })
- // no way to fail as only Ok(VisitRecursion::Continue) is returned
- .unwrap();
-
- columns
+ fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ None
}
- fn check_expr_is_col_equal_const(
- exr: &phys_expr::BinaryExpr,
- ) -> Option<(phys_expr::Column, ScalarValue)> {
- if Operator::Eq.ne(exr.op()) {
- return None;
- }
+ /// Use bloom filters to determine if we are sure this column can not
+ /// possibly contain `values`
+ ///
+ /// The `contained` API returns false if the bloom filters knows that *ALL*
+ /// of the values in a column are not present.
+ fn contained(
+ &self,
+ column: &Column,
+ values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray> {
+ let sbbf = self.column_sbbf.get(column.name.as_str())?;
- let left_any = exr.left().as_any();
- let right_any = exr.right().as_any();
- if let (Some(col), Some(liter)) = (
- left_any.downcast_ref::<phys_expr::Column>(),
- right_any.downcast_ref::<phys_expr::Literal>(),
- ) {
- return Some((col.clone(), liter.value().clone()));
- }
- if let (Some(liter), Some(col)) = (
- left_any.downcast_ref::<phys_expr::Literal>(),
- right_any.downcast_ref::<phys_expr::Column>(),
- ) {
- return Some((col.clone(), liter.value().clone()));
- }
- None
+ // Bloom filters are probabilistic data structures that can return
false
+ // positives (i.e. it might return true even if the value is not
+ // present) however, the bloom filter will return `false` if the value
is
+ // definitely not present.
+
+ let known_not_present = values
+ .iter()
+ .map(|value| match value {
+ ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
+ ScalarValue::Boolean(Some(v)) => sbbf.check(v),
+ ScalarValue::Float64(Some(v)) => sbbf.check(v),
+ ScalarValue::Float32(Some(v)) => sbbf.check(v),
+ ScalarValue::Int64(Some(v)) => sbbf.check(v),
+ ScalarValue::Int32(Some(v)) => sbbf.check(v),
+ ScalarValue::Int16(Some(v)) => sbbf.check(v),
+ ScalarValue::Int8(Some(v)) => sbbf.check(v),
+ _ => true,
+ })
+ // The row group doesn't contain any of the values if
+ // all the checks are false
+ .all(|v| !v);
+
+ let contains = if known_not_present {
+ Some(false)
+ } else {
+ // Given the bloom filter is probabilistic, we can't be sure that
+ // the row group actually contains the values. Return `None` to
+ // indicate this uncertainty
+ None
+ };
+
+ Some(BooleanArray::from(vec![contains]))
}
}
@@ -1367,6 +1301,7 @@ mod tests {
let metadata = builder.metadata().clone();
let pruned_row_group = prune_row_groups_by_bloom_filters(
+ pruning_predicate.schema(),
&mut builder,
row_groups,
metadata.row_groups(),