Weijun-H commented on code in PR #7821:
URL: https://github.com/apache/arrow-datafusion/pull/7821#discussion_r1365870176


##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
     filtered
 }
 
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+    T: AsyncFileReader + Send + 'static,
+>(
+    builder: &mut ParquetRecordBatchStreamBuilder<T>,
+    row_groups: &[usize],
+    groups: &[RowGroupMetaData],
+    predicate: &PruningPredicate,
+    metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+    let bf_predicates = match 
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+    {
+        Ok(predicates) => predicates,
+        Err(_) => {
+            return row_groups.to_vec();
+        }
+    };
+    let mut filtered = Vec::with_capacity(groups.len());
+    for idx in row_groups {
+        let rg_metadata = &groups[*idx];
+        // get all columns bloom filter
+        let mut column_sbbf =
+            HashMap::with_capacity(bf_predicates.required_columns.len());
+        for column_name in bf_predicates.required_columns.iter() {
+            let column_idx = match rg_metadata
+                .columns()
+                .iter()
+                .enumerate()
+                .find(|(_, column)| 
column.column_path().string().eq(column_name))
+            {
+                Some((column_idx, _)) => column_idx,
+                None => continue,
+            };
+            let bf = match builder
+                .get_row_group_column_bloom_filter(*idx, column_idx)
+                .await
+            {
+                Ok(bf) => match bf {
+                    Some(bf) => bf,
+                    None => {
+                        continue;
+                    }
+                },
+                Err(e) => {
+                    log::error!("Error evaluating row group predicate values 
when using BloomFilterPruningPredicate {e}");
+                    metrics.predicate_evaluation_errors.add(1);
+                    continue;
+                }
+            };
+            column_sbbf.insert(column_name.to_owned(), bf);
+        }
+        if bf_predicates.prune(&column_sbbf) {
+            metrics.row_groups_pruned.add(1);
+            continue;
+        }
+        filtered.push(*idx);
+    }
+    filtered
+}
+
+struct BloomFilterPruningPredicate {
+    /// Actual pruning predicate (rewritten in terms of column min/max 
statistics)
+    predicate_expr: Option<phys_expr::BinaryExpr>,
+    /// The statistics required to evaluate this predicate
+    required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+    fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+        let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+        match Self::get_predicate_columns(expr) {
+            Some(columns) => Ok(Self {
+                predicate_expr: expr.cloned(),
+                required_columns: columns.into_iter().collect(),
+            }),
+            None => Err(DataFusionError::Execution(
+                "BloomFilterPruningPredicate only support binary 
expr".to_string(),
+            )),
+        }
+    }
+
+    fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
+        Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), 
column_sbbf)
+    }
+
+    /// filter the expr with bloom filter return true if the expr can be pruned

Review Comment:
   ```suggestion
       /// Return true if the expr can be pruned by the bloom filter
   ```



##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
     filtered
 }
 
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+    T: AsyncFileReader + Send + 'static,
+>(
+    builder: &mut ParquetRecordBatchStreamBuilder<T>,
+    row_groups: &[usize],
+    groups: &[RowGroupMetaData],
+    predicate: &PruningPredicate,
+    metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+    let bf_predicates = match 
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+    {
+        Ok(predicates) => predicates,
+        Err(_) => {
+            return row_groups.to_vec();
+        }
+    };
+    let mut filtered = Vec::with_capacity(groups.len());
+    for idx in row_groups {
+        let rg_metadata = &groups[*idx];
+        // get all columns bloom filter
+        let mut column_sbbf =
+            HashMap::with_capacity(bf_predicates.required_columns.len());
+        for column_name in bf_predicates.required_columns.iter() {
+            let column_idx = match rg_metadata
+                .columns()
+                .iter()
+                .enumerate()
+                .find(|(_, column)| 
column.column_path().string().eq(column_name))
+            {
+                Some((column_idx, _)) => column_idx,
+                None => continue,
+            };
+            let bf = match builder
+                .get_row_group_column_bloom_filter(*idx, column_idx)
+                .await
+            {
+                Ok(bf) => match bf {
+                    Some(bf) => bf,
+                    None => {
+                        continue;
+                    }
+                },
+                Err(e) => {
+                    log::error!("Error evaluating row group predicate values 
when using BloomFilterPruningPredicate {e}");
+                    metrics.predicate_evaluation_errors.add(1);
+                    continue;
+                }
+            };
+            column_sbbf.insert(column_name.to_owned(), bf);
+        }
+        if bf_predicates.prune(&column_sbbf) {
+            metrics.row_groups_pruned.add(1);
+            continue;
+        }
+        filtered.push(*idx);
+    }
+    filtered
+}
+
+struct BloomFilterPruningPredicate {
+    /// Actual pruning predicate (rewritten in terms of column min/max 
statistics)
+    predicate_expr: Option<phys_expr::BinaryExpr>,
+    /// The statistics required to evaluate this predicate
+    required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+    fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+        let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+        match Self::get_predicate_columns(expr) {
+            Some(columns) => Ok(Self {
+                predicate_expr: expr.cloned(),
+                required_columns: columns.into_iter().collect(),
+            }),
+            None => Err(DataFusionError::Execution(
+                "BloomFilterPruningPredicate only support binary 
expr".to_string(),
+            )),
+        }
+    }
+
+    fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
+        Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), 
column_sbbf)
+    }
+
+    /// filter the expr with bloom filter return true if the expr can be pruned
+    fn prune_expr_with_bloom_filter(
+        expr: Option<&phys_expr::BinaryExpr>,
+        column_sbbf: &HashMap<String, Sbbf>,
+    ) -> bool {
+        if expr.is_none() {
+            return false;
+        }
+        let expr = expr.unwrap();
+        match expr.op() {
+            Operator::And => {
+                let left = Self::prune_expr_with_bloom_filter(
+                    
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
+                    column_sbbf,
+                );
+                let right = Self::prune_expr_with_bloom_filter(
+                    expr.right()
+                        .as_any()
+                        .downcast_ref::<phys_expr::BinaryExpr>(),
+                    column_sbbf,
+                );
+                left || right
+            }
+            Operator::Or => {
+                let left = Self::prune_expr_with_bloom_filter(
+                    
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
+                    column_sbbf,
+                );
+                let right = Self::prune_expr_with_bloom_filter(
+                    expr.right()
+                        .as_any()
+                        .downcast_ref::<phys_expr::BinaryExpr>(),
+                    column_sbbf,
+                );
+                left && right
+            }

Review Comment:
   ```suggestion
               Operator::And | Operator::Or => {
                   let left = Self::prune_expr_with_bloom_filter(
                       
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
                       column_sbbf,
                   );
                   let right = Self::prune_expr_with_bloom_filter(
                       expr.right()
                           .as_any()
                           .downcast_ref::<phys_expr::BinaryExpr>(),
                       column_sbbf,
                   );
   
                   match expr.op() {
                       Operator::And => left || right,
                       Operator::Or => left && right,
                       _ => false,
                   }
               }
   ```



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -246,6 +250,18 @@ impl ParquetExec {
             .unwrap_or(config_options.execution.parquet.enable_page_index)
     }
 
+    /// If enabled, the reader will read the bloom filter

Review Comment:
   ```suggestion
       /// If enabled, the reader will read by the bloom filter
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to