ozankabak commented on code in PR #14279:
URL: https://github.com/apache/datafusion/pull/14279#discussion_r1929486166
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -191,7 +201,18 @@ pub fn analyze(
shrink_boundaries(graph, target_boundaries,
target_expr_and_indices)
}
PropagationResult::Infeasible => {
- Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
+ // If the propagation result is infeasible, map target boundary
intervals to None
+ Ok(AnalysisContext::new(
Review Comment:
You seem to already own `target_boundaries`, no need to create a new
`ExprBoundaries` object. Just iterate over `target_boundaries` mutably and
assign `None` to the `interval` attribute.
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -179,7 +179,17 @@ pub fn analyze(
expr.as_any()
.downcast_ref::<Column>()
.filter(|expr_column| bound.column.eq(*expr_column))
- .map(|_| (*i, bound.interval.clone()))
+ .map(|_| {
+ (
+ *i,
+ match bound.interval.clone() {
Review Comment:
This function already returns a `Result`. You should return an error here
and not panic with `unreachable!`. Also doing the cloning *after* checking for
`None` is a slightly better style.
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -335,7 +355,9 @@ mod tests {
df_schema.as_ref(),
)
.unwrap();
- let actual = &analysis_result.boundaries[0].interval;
+ let Some(actual) = &analysis_result.boundaries[0].interval else {
+ panic!("Interval should be initialized for all columns");
Review Comment:
```suggestion
panic!("The analysis result should contain non-empty
intervals for all columns");
```
AFAICT this is not about initialization, you expect non-empty results.
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -82,7 +82,7 @@ impl AnalysisContext {
pub struct ExprBoundaries {
pub column: Column,
/// Minimum and maximum values this expression can have.
Review Comment:
```suggestion
/// Minimum and maximum values this expression can have. A `None` value
/// represents an infeasible expression.
```
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -421,7 +421,18 @@ fn collect_new_statistics(
..
},
)| {
- let (lower, upper) = interval.into_bounds();
+ let (lower, upper) = match interval {
+ Some(interval) => interval.into_bounds(),
+ // If the interval is None, we can say that there are no
rows
+ None => {
+ return ColumnStatistics {
+ null_count: Precision::Exact(0),
+ max_value:
Precision::Exact(ScalarValue::Int32(Some(0))),
+ min_value:
Precision::Exact(ScalarValue::Int32(Some(0))),
+ distinct_count: Precision::Exact(0),
+ }
+ }
+ };
Review Comment:
```suggestion
let Some(interval) = interval else {
// If the interval is `None`, we can say that there are
no rows:
return ColumnStatistics {
null_count: Precision::Exact(0),
max_value:
Precision::Exact(ScalarValue::Int32(Some(0))),
min_value:
Precision::Exact(ScalarValue::Int32(Some(0))),
distinct_count: Precision::Exact(0),
};
};
let (lower, upper) = interval.into_bounds();
```
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -235,16 +256,25 @@ fn shrink_boundaries(
fn calculate_selectivity(
target_boundaries: &[ExprBoundaries],
initial_boundaries: &[ExprBoundaries],
-) -> f64 {
+) -> Result<f64> {
// Since the intervals are assumed uniform and the values
// are not correlated, we need to multiply the selectivities
// of multiple columns to get the overall selectivity.
+ let mut acc: f64 = 1.0;
initial_boundaries
.iter()
.zip(target_boundaries.iter())
- .fold(1.0, |acc, (initial, target)| {
- acc * cardinality_ratio(&initial.interval, &target.interval)
- })
+ .for_each(|(initial, target)| {
Review Comment:
Similar to above -- let's return a `Result` instead of panicking and do any
checks before clones. Also, given that this is a good old for loop now, it will
increase readability to switch to a simple `for` statement instead of using a
closure within the `for_each` method.
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -344,6 +366,41 @@ mod tests {
}
}
+ #[test]
+ fn test_analyze_empty_set_boundary_exprs() {
+ let schema = Arc::new(Schema::new(vec![make_field("a",
DataType::Int32)]));
+
+ let test_cases: Vec<Expr> = vec![
+ // a > 10 AND a < 10
+ col("a").gt(lit(10)).and(col("a").lt(lit(10))),
+ // a > 5 AND (a < 20 OR a > 20)
+ // a > 10 AND a < 20
+ // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
+ col("a")
+ .gt(lit(10))
+ .and(col("a").lt(lit(20)))
+ .and(col("a").gt(lit(20)))
+ .and(col("a").lt(lit(30))),
+ ];
+
+ for expr in test_cases {
+ let boundaries =
ExprBoundaries::try_new_unbounded(&schema).unwrap();
+ let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
+ let physical_expr =
+ create_physical_expr(&expr, &df_schema,
&ExecutionProps::new()).unwrap();
+ let analysis_result = analyze(
+ &physical_expr,
+ AnalysisContext::new(boundaries),
+ df_schema.as_ref(),
+ )
+ .unwrap();
+
+ analysis_result.boundaries.iter().for_each(|bound| {
Review Comment:
Good old for loop is preferable in cases like this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]