Ted-Jiang commented on code in PR #7821:
URL: https://github.com/apache/arrow-datafusion/pull/7821#discussion_r1366386102
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
Review Comment:
Sorry i already handle in here
https://github.com/apache/arrow-datafusion/blob/3f8c51222fa0aa5bb4ef7fa5cab55875bfde0a63/datafusion/core/src/physical_optimizer/pruning.rs#L759-L76
🤣 maybe we can add a ref here
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
Review Comment:
Sorry i already handle in here
https://github.com/apache/arrow-datafusion/blob/3f8c51222fa0aa5bb4ef7fa5cab55875bfde0a63/datafusion/core/src/physical_optimizer/pruning.rs#L759-L76
🤣 maybe we can add a ref here
##########
datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs:
##########
@@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups(
filtered
}
+/// Prune row groups by bloom filters
+///
+/// Returns a vector of indexes into `groups` which should be scanned.
+///
+/// If an index is NOT present in the returned Vec it means the
+/// predicate filtered all the row group.
+///
+/// If an index IS present in the returned Vec it means the predicate
+/// did not filter out that row group.
+pub(crate) async fn prune_row_groups_by_bloom_filters<
+ T: AsyncFileReader + Send + 'static,
+>(
+ builder: &mut ParquetRecordBatchStreamBuilder<T>,
+ row_groups: &[usize],
+ groups: &[RowGroupMetaData],
+ predicate: &PruningPredicate,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ let bf_predicates = match
BloomFilterPruningPredicate::try_new(predicate.orig_expr())
+ {
+ Ok(predicates) => predicates,
+ Err(_) => {
+ return row_groups.to_vec();
+ }
+ };
+ let mut filtered = Vec::with_capacity(groups.len());
+ for idx in row_groups {
+ let rg_metadata = &groups[*idx];
+ // get all columns bloom filter
+ let mut column_sbbf =
+ HashMap::with_capacity(bf_predicates.required_columns.len());
+ for column_name in bf_predicates.required_columns.iter() {
+ let column_idx = match rg_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_, column)|
column.column_path().string().eq(column_name))
+ {
+ Some((column_idx, _)) => column_idx,
+ None => continue,
+ };
+ let bf = match builder
+ .get_row_group_column_bloom_filter(*idx, column_idx)
+ .await
+ {
+ Ok(bf) => match bf {
+ Some(bf) => bf,
+ None => {
+ continue;
+ }
+ },
+ Err(e) => {
+ log::error!("Error evaluating row group predicate values
when using BloomFilterPruningPredicate {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ continue;
+ }
+ };
+ column_sbbf.insert(column_name.to_owned(), bf);
+ }
+ if bf_predicates.prune(&column_sbbf) {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ filtered.push(*idx);
+ }
+ filtered
+}
+
+struct BloomFilterPruningPredicate {
+ /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ predicate_expr: Option<phys_expr::BinaryExpr>,
+ /// The statistics required to evaluate this predicate
+ required_columns: Vec<String>,
+}
+
+impl BloomFilterPruningPredicate {
+ fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
+ let expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
+ match Self::get_predicate_columns(expr) {
+ Some(columns) => Ok(Self {
+ predicate_expr: expr.cloned(),
+ required_columns: columns.into_iter().collect(),
+ }),
+ None => Err(DataFusionError::Execution(
+ "BloomFilterPruningPredicate only support binary
expr".to_string(),
Review Comment:
Sorry, already handle in here
https://github.com/apache/arrow-datafusion/blob/3f8c51222fa0aa5bb4ef7fa5cab55875bfde0a63/datafusion/core/src/physical_optimizer/pruning.rs#L759-L76
🤣 maybe we can add a ref here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]