alamb commented on code in PR #7821:
URL: https://github.com/apache/arrow-datafusion/pull/7821#discussion_r1365751727
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -846,4 +1120,185 @@ mod tests {
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
}
+
+ #[tokio::test]
+ async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
+ // load parquet file
+ let testdata = datafusion_common::test_util::parquet_test_data();
+ let file_name = "data_index_bloom_encoding_stats.parquet";
+ let path = format!("{testdata}/{file_name}");
+ let data = bytes::Bytes::from(std::fs::read(path).unwrap());
+
+ // generate pruning predicate
+ let schema = Schema::new(vec![Field::new("String", DataType::Utf8,
false)]);
+ let expr = col(r#""String""#).eq(lit("Hello_Not_Exists"));
+ let expr = logical2physical(&expr, &schema);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+
+ let row_groups = vec![0];
+ let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
+ file_name,
+ data,
+ &pruning_predicate,
+ &row_groups,
+ )
+ .await
+ .unwrap();
+ assert!(pruned_row_groups.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_row_group_bloom_filter_pruning_predicate_partial_expr() {
+ // load parquet file
+ let testdata = datafusion_common::test_util::parquet_test_data();
+ let file_name = "data_index_bloom_encoding_stats.parquet";
+ let path = format!("{testdata}/{file_name}");
+ let data = bytes::Bytes::from(std::fs::read(path).unwrap());
+
+ // generate pruning predicate
+ let schema = Schema::new(vec![Field::new("String", DataType::Utf8,
false)]);
+ let expr = col(r#""String""#)
Review Comment:
I wonder how much additional coverage that this test and the next one
(`test_row_group_bloom_filter_pruning_predicate_partial_expr` and
`test_row_group_bloom_filter_pruning_predicate_mutiple_expr`) are adding?
In other words, I think only one is required
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
Review Comment:
I really like how this interface is similar to [`PruningPredicate`
](https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html)
and encapsulates the logic well.
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -846,4 +1120,185 @@ mod tests {
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
}
+
Review Comment:
Here is the data in case anyone was wondering:
```sql
❯ select * from 'data_index_bloom_encoding_stats.parquet';
+-----------+
| String |
+-----------+
| Hello |
| This is |
| a |
| test |
| How |
| are you |
| doing |
| today |
| the quick |
| brown fox |
| jumps |
| over |
| the lazy |
| dog |
+-----------+
14 rows in set. Query took 0.003 seconds.
```
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -84,6 +84,9 @@ pub struct ParquetExec {
/// Override for `Self::with_enable_page_index`. If None, uses
/// values from base_config
enable_page_index: Option<bool>,
+ /// Override for `Self::with_enable_bloom_filter`. If None, uses
+ /// values from base_config
+ enable_bloom_filter: Option<bool>,
Review Comment:
I think we should also add a session level configuration for
`enable_bloom_filter` (defaults to on), as we do for `enable_page_index` so
that users have the ability to turn it off.
https://github.com/apache/arrow-datafusion/blob/c97a0d2ebb8697855edd5b56904accbfa9a80290/datafusion/common/src/config.rs#L284
We can do this as a follow on PR (it doesn't have to be this one)
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
+ )),
+ }
+ }
+
+ fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
+ Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(),
column_sbbf)
+ }
+
+ /// filter the expr with bloom filter return true if the expr can be pruned
+ fn prune_expr_with_bloom_filter(
+ expr: Option<&phys_expr::BinaryExpr>,
+ column_sbbf: &HashMap<String, Sbbf>,
+ ) -> bool {
+ if expr.is_none() {
+ return false;
+ }
+ let expr = expr.unwrap();
+ match expr.op() {
+ Operator::And => {
+ let left = Self::prune_expr_with_bloom_filter(
+
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
+ column_sbbf,
+ );
+ let right = Self::prune_expr_with_bloom_filter(
+ expr.right()
+ .as_any()
+ .downcast_ref::<phys_expr::BinaryExpr>(),
+ column_sbbf,
+ );
+ left || right
+ }
+ Operator::Or => {
+ let left = Self::prune_expr_with_bloom_filter(
+
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
+ column_sbbf,
+ );
+ let right = Self::prune_expr_with_bloom_filter(
+ expr.right()
+ .as_any()
+ .downcast_ref::<phys_expr::BinaryExpr>(),
+ column_sbbf,
+ );
+ left && right
+ }
+ Operator::Eq => {
+ if let Some((col, val)) =
Self::check_expr_is_col_equal_const(expr) {
+ if let Some(sbbf) = column_sbbf.get(col.name()) {
+ match val {
+ ScalarValue::Utf8(Some(v)) =>
!sbbf.check(&v.as_str()),
+ ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
+ ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
+ _ => false,
+ }
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+ _ => false,
+ }
+ }
+
+ fn get_predicate_columns(
+ expr: Option<&phys_expr::BinaryExpr>,
+ ) -> Option<HashSet<String>> {
+ match expr {
Review Comment:
I think you could write this code to be more efficient by using a
[`TreeNode`](https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html)
recursion instead. Here is a proposal:
https://github.com/openobserve/arrow-datafusion/pull/5
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
+ )),
+ }
+ }
+
+ fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
+ Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(),
column_sbbf)
+ }
+
+ /// filter the expr with bloom filter return true if the expr can be pruned
+ fn prune_expr_with_bloom_filter(
+ expr: Option<&phys_expr::BinaryExpr>,
+ column_sbbf: &HashMap<String, Sbbf>,
+ ) -> bool {
+ if expr.is_none() {
+ return false;
+ }
+ let expr = expr.unwrap();
Review Comment:
this will panic if an expression like `x LIKE 'foo'` is passed, maybe it
should return `false` instead like the following (or perhaps have a comment
about why this is guaranteed to be `Some`)
```suggestion
let Some(expr) = expr else {
// unsupported predicate
return false;
}
```
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -846,4 +1120,185 @@ mod tests {
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
}
+
Review Comment:
Here is the data in case anyone was wondering:
```sql
❯ select * from 'data_index_bloom_encoding_stats.parquet';
+-----------+
| String |
+-----------+
| Hello |
| This is |
| a |
| test |
| How |
| are you |
| doing |
| today |
| the quick |
| brown fox |
| jumps |
| over |
| the lazy |
| dog |
+-----------+
14 rows in set. Query took 0.003 seconds.
```
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
+ )),
+ }
+ }
+
+ fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
+ Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(),
column_sbbf)
+ }
+
+ /// filter the expr with bloom filter return true if the expr can be pruned
Review Comment:
```suggestion
/// filter the expr with bloom filter return true if the `expr` can be
proved not `true`
/// based on the bloom filter.
```
##########
datafusion/sqllogictest/test_files/json.slt:
##########
@@ -58,8 +58,10 @@ AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------JsonExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
-query error DataFusion error: Schema error: No field named mycol\.
+query N
Review Comment:
FWIW this change is related to the arrow update
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]