alamb commented on code in PR #6982:
URL: https://github.com/apache/arrow-datafusion/pull/6982#discussion_r1267191022


##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -451,6 +453,75 @@ impl Interval {
         lower: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
         upper: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
     };
+
+    // Cardinality is the number of all points included by the interval, 
considering its bounds.

Review Comment:
   Something that might be worth considering in the long term that @tustvold 
mentioned the other day is to vectorize these calculations -- at the moment 
they are doin in the context of a single expression, but eventually if we want 
to use this logic to prune large numbers of files / etc based on statistics it 
may take too long
   
   No change is needed here, I am just planting a seed of an idea in case this 
was on your list too



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
     ScalarValue::try_from_array(&cast_array, 0)
 }
 
+/// This function calculates the final cardinality result by inspecting the 
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+    lower_open: bool,
+    upper_open: bool,
+    diff: u64,
+) -> u64 {
+    match (lower_open, upper_open) {
+        (false, false) => diff + 1,
+        (true, true) => diff - 1,
+        _ => diff,
+    }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {
+    fn one() -> Self;
+}
+
+macro_rules! impl_OneTrait{
+    ($($m:ty),*) => {$( impl OneTrait for $m  { fn one() -> Self { 1 as $m } 
})*}
+}
+impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, f32, f64}
+
+/// This function either increments or decrements its argument, depending on 
the `DIR` value. If `true`, it increments; otherwise it decrements the argument.
+fn increment_decrement<const DIR: bool, T: OneTrait + SubAssign + AddAssign>(
+    mut val: T,
+) -> T {
+    if DIR {
+        val.add_assign(T::one());
+    } else {
+        val.sub_assign(T::one());
+    }
+    val
+}
+
+/// This function returns the next/previous value depending on the `DIR` value.
+/// If `true`, it returns the next value; otherwise it returns the previous 
value.
+fn get_next_value<const DIR: bool>(value: ScalarValue) -> ScalarValue {
+    use ScalarValue::*;
+    match value {
+        Float32(Some(val)) => {
+            let incremented_bits = increment_decrement::<DIR, 
u32>(val.to_bits());
+            Float32(Some(f32::from_bits(incremented_bits)))
+        }
+        Float64(Some(val)) => {
+            let incremented_bits = increment_decrement::<DIR, 
u64>(val.to_bits());
+            Float64(Some(f64::from_bits(incremented_bits)))
+        }
+        Int8(Some(val)) => Int8(Some(increment_decrement::<DIR, i8>(val))),
+        Int16(Some(val)) => Int16(Some(increment_decrement::<DIR, i16>(val))),
+        Int32(Some(val)) => Int32(Some(increment_decrement::<DIR, i32>(val))),
+        Int64(Some(val)) => Int64(Some(increment_decrement::<DIR, i64>(val))),
+        UInt8(Some(val)) => UInt8(Some(increment_decrement::<DIR, u8>(val))),
+        UInt16(Some(val)) => UInt16(Some(increment_decrement::<DIR, 
u16>(val))),
+        UInt32(Some(val)) => UInt32(Some(increment_decrement::<DIR, 
u32>(val))),
+        UInt64(Some(val)) => UInt64(Some(increment_decrement::<DIR, 
u64>(val))),
+        _ => value, // Infinite bounds or unsupported datatypes
+    }
+}
+
+/// This function takes an interval, and if it has any open bound(s), it
+/// converts them to closed bound(s) preserving the interval endpoints.
+pub fn interval_with_closed_bounds(mut interval: Interval) -> Interval {

Review Comment:
   This might be more elegant if it was a method on  a `Interval` itelf
   
   ```rust
   impl Interval 
     pub fn iwith_closed_bounds(mut self -> Interval {
   ...
   }
   }
   ```
   
   



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
     ScalarValue::try_from_array(&cast_array, 0)
 }
 
+/// This function calculates the final cardinality result by inspecting the 
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+    lower_open: bool,
+    upper_open: bool,
+    diff: u64,
+) -> u64 {
+    match (lower_open, upper_open) {
+        (false, false) => diff + 1,
+        (true, true) => diff - 1,
+        _ => diff,
+    }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {

Review Comment:
   Perhaps you could use ScalarValue::new_one here: 
https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html#method.new_one



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
     ScalarValue::try_from_array(&cast_array, 0)
 }
 
+/// This function calculates the final cardinality result by inspecting the 
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+    lower_open: bool,
+    upper_open: bool,
+    diff: u64,
+) -> u64 {
+    match (lower_open, upper_open) {
+        (false, false) => diff + 1,
+        (true, true) => diff - 1,
+        _ => diff,
+    }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {
+    fn one() -> Self;
+}
+
+macro_rules! impl_OneTrait{
+    ($($m:ty),*) => {$( impl OneTrait for $m  { fn one() -> Self { 1 as $m } 
})*}
+}
+impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, f32, f64}
+
+/// This function either increments or decrements its argument, depending on 
the `DIR` value. If `true`, it increments; otherwise it decrements the argument.
+fn increment_decrement<const DIR: bool, T: OneTrait + SubAssign + AddAssign>(
+    mut val: T,
+) -> T {
+    if DIR {
+        val.add_assign(T::one());
+    } else {
+        val.sub_assign(T::one());
+    }
+    val
+}
+
+/// This function returns the next/previous value depending on the `DIR` value.
+/// If `true`, it returns the next value; otherwise it returns the previous 
value.
+fn get_next_value<const DIR: bool>(value: ScalarValue) -> ScalarValue {

Review Comment:
   Perhaps this would be more consistent if it were added to ScalarValue itself
   
   ```rust
   impl ScalarValue  {
     fn next_value(&self) -> ScalarValue {
   ..
   }
   ```



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -672,4 +675,309 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_multiple_columns() -> Result<()> {

Review Comment:
   👍  thank you for these tests
   
   Stylistically they might be easier to write if you could avoid the setup of 
StatisticsExec and FilterExec (and just make the input / predicate / output. 
However, that is just a minor suggestion 



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -1172,4 +1321,70 @@ mod tests {
         let upper = 1.5;
         capture_mode_change_f32((lower, upper), true, true);
     }
+
+    #[test]
+    fn test_cardinality_of_intervals() -> Result<()> {
+        // In IEEE 754 standard for floating-point arithmetic, if we keep the 
sign and exponent fields same,
+        // we can represent 4503599627370496 different numbers by changing the 
mantissa
+        // (4503599627370496 = 2^52, since there are 52 bits in mantissa, and 
2^23 = 8388608 for f32).
+        let distinct_f64 = 4503599627370496;
+        let distinct_f32 = 8388608;
+        let intervals = [
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(0.25), false),
+                IntervalBound::new(ScalarValue::from(0.50), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(0.5), false),
+                IntervalBound::new(ScalarValue::from(1.0), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(1.0), false),
+                IntervalBound::new(ScalarValue::from(2.0), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(32.0), false),
+                IntervalBound::new(ScalarValue::from(64.0), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(-0.50), false),
+                IntervalBound::new(ScalarValue::from(-0.25), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(-32.0), false),
+                IntervalBound::new(ScalarValue::from(-16.0), true),
+            ),
+        ];
+        for interval in intervals {
+            assert_eq!(interval.cardinality()?, distinct_f64);
+        }
+
+        let intervals = [
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(0.25_f32), false),
+                IntervalBound::new(ScalarValue::from(0.50_f32), true),
+            ),
+            Interval::new(
+                IntervalBound::new(ScalarValue::from(-1_f32), false),
+                IntervalBound::new(ScalarValue::from(-0.5_f32), true),
+            ),
+        ];
+        for interval in intervals {
+            assert_eq!(interval.cardinality()?, distinct_f32);
+        }
+
+        let interval = Interval::new(
+            IntervalBound::new(ScalarValue::from(-0.0625), false),
+            IntervalBound::new(ScalarValue::from(0.0625), true),
+        );
+        assert_eq!(interval.cardinality()?, distinct_f64 * 2_048);

Review Comment:
   While I understand the rationale behind this choice, I think in practice 
this is not likely to provide much meaningful information -- to estimate 
cardinality in such cases, one approach is to use distinct values / estimates 
from the input -- like if the input's cardinality is 100, but the range is 
`-0.0625` to `0.0625` then output cardinality of stable expressions is likely 
to be bounded by 100



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -672,4 +675,309 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_multiple_columns() -> Result<()> {

Review Comment:
   👍  thank you for these tests
   
   Stylistically they might be easier to write if you could avoid the setup of 
StatisticsExec and FilterExec (and just make the input / predicate / output. 
However, that is just a minor suggestion 



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -451,6 +453,75 @@ impl Interval {
         lower: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
         upper: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
     };
+
+    // Cardinality is the number of all points included by the interval, 
considering its bounds.
+    pub fn cardinality(&self) -> Result<u64> {
+        match self.get_datatype() {
+            Ok(data_type) if data_type.is_integer() => {
+                if let Some(diff) = 
self.upper.value.distance(&self.lower.value) {
+                    Ok(calculate_cardinality_based_on_bounds(
+                        self.lower.open,
+                        self.upper.open,
+                        diff as u64,
+                    ))
+                } else {
+                    Err(DataFusionError::Execution(format!(
+                        "Cardinality cannot be calculated for {:?}",
+                        self
+                    )))
+                }
+            }
+            // Since the floating-point numbers are ordered in the same order 
as their binary representation,
+            // we can consider their binary representations as "indices" and 
subtract them.
+            // 
https://stackoverflow.com/questions/8875064/how-many-distinct-floating-point-numbers-in-a-specific-range
+            Ok(data_type) if data_type.is_floating() => {

Review Comment:
   Not sure it matters but `is_floating` also includes `Float16` but the code 
below only handles Float32 / Float64



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -152,124 +283,82 @@ pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
 /// the boundaries for all known columns.
 #[derive(Clone, Debug, PartialEq)]
 pub struct AnalysisContext {
-    /// A list of known column boundaries, ordered by the index
-    /// of the column in the current schema.
-    pub column_boundaries: Vec<Option<ExprBoundaries>>,
-    // Result of the current analysis.
-    pub boundaries: Option<ExprBoundaries>,
+    // A list of known column boundaries, ordered by the index
+    // of the column in the current schema.
+    pub boundaries: Option<Vec<ExprBoundaries>>,
+    /// The estimated percentage of rows that this expression would select, if
+    /// it were to be used as a boolean predicate on a filter. The value will 
be
+    /// between 0.0 (selects nothing) and 1.0 (selects everything).
+    pub selectivity: Option<f64>,
 }
 
 impl AnalysisContext {
-    pub fn new(
-        input_schema: &Schema,
-        column_boundaries: Vec<Option<ExprBoundaries>>,
-    ) -> Self {
-        assert_eq!(input_schema.fields().len(), column_boundaries.len());
+    pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
         Self {
-            column_boundaries,
-            boundaries: None,
+            boundaries: Some(boundaries),
+            selectivity: None,
         }
     }
 
-    /// Create a new analysis context from column statistics.
-    pub fn from_statistics(input_schema: &Schema, statistics: &Statistics) -> 
Self {
-        // Even if the underlying statistics object doesn't have any column 
level statistics,
-        // we can still create an analysis context with the same number of 
columns and see whether
-        // we can infer it during the way.
-        let column_boundaries = match &statistics.column_statistics {
-            Some(columns) => columns
-                .iter()
-                .map(ExprBoundaries::from_column)
-                .collect::<Vec<_>>(),
-            None => vec![None; input_schema.fields().len()],
-        };
-        Self::new(input_schema, column_boundaries)
-    }
-
-    pub fn boundaries(&self) -> Option<&ExprBoundaries> {
-        self.boundaries.as_ref()
-    }
-
-    /// Set the result of the current analysis.
-    pub fn with_boundaries(mut self, result: Option<ExprBoundaries>) -> Self {
-        self.boundaries = result;
-        self
+    pub fn new_with_selectivity(
+        boundaries: Vec<ExprBoundaries>,
+        selectivity: f64,
+    ) -> Self {
+        Self {
+            boundaries: Some(boundaries),
+            selectivity: Some(selectivity),
+        }

Review Comment:
   An alternate form that might be more pleasing to the eye might be 
   
   ```suggestion
       pub fn with_selectivity(
           mut self,
           selectivity: f64,
       ) -> Self {
           self.selectivity = Some(selectivity);
           self
          }
   ```
   
   then instead of 
   
   ```rust
       Ok(AnalysisContext::new_with_selectivity(
           target_boundaries,
           selectivity,
       ))
   ```
   
   you could write
   
   ```rust
       Ok(AnalysisContext::new(target_boundaries)
           .with_selectivity(selectivity)
       )
   ```



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -139,6 +133,143 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + 
PartialEq<dyn Any> {
     fn dyn_hash(&self, _state: &mut dyn Hasher);
 }
 
+/// Attempts to refine column boundaries and compute a selectivity value.
+///
+/// The function accepts boundaries of the input columns in the `context` 
parameter.
+/// It then tries to tighten these boundaries based on the provided `expr`.
+/// The resulting selectivity value is calculated by comparing the initial and 
final boundaries.
+/// The computation assumes that the data within the column is uniformly 
distributed and not sorted.
+///
+/// # Arguments
+///
+/// * `context` - The context holding input column boundaries.
+/// * `expr` - The expression used to shrink the column boundaries.
+///
+/// # Returns
+///
+/// * `AnalysisContext` constructed by pruned boundaries and a selectivity 
value.
+pub fn analyze(
+    expr: &Arc<dyn PhysicalExpr>,
+    context: AnalysisContext,
+) -> Result<AnalysisContext> {
+    let target_boundaries = context.boundaries.ok_or_else(|| {
+        DataFusionError::Internal("No column exists at the input to 
filter".to_string())
+    })?;
+
+    let mut graph = ExprIntervalGraph::try_new(expr.clone())?;
+
+    let columns: Vec<Arc<dyn PhysicalExpr>> = collect_columns(expr)
+        .into_iter()
+        .map(|c| Arc::new(c) as Arc<dyn PhysicalExpr>)
+        .collect();
+
+    let target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)> =
+        graph.gather_node_indices(columns.as_slice());
+
+    let mut target_indices_and_boundaries: Vec<(usize, Interval)> =
+        target_expr_and_indices
+            .iter()
+            .filter_map(|(expr, i)| {
+                target_boundaries.iter().find_map(|bound| {
+                    expr.as_any()
+                        .downcast_ref::<Column>()
+                        .filter(|expr_column| bound.column.eq(*expr_column))
+                        .map(|_| (*i, bound.interval.clone()))
+                })
+            })
+            .collect();
+
+    match graph.update_ranges(&mut target_indices_and_boundaries)? {
+        PropagationResult::Success => {
+            shrink_boundaries(expr, graph, target_boundaries, 
target_expr_and_indices)
+        }
+        PropagationResult::Infeasible => 
Ok(AnalysisContext::new_with_selectivity(
+            target_boundaries,
+            0.0,
+        )),
+        PropagationResult::CannotPropagate => 
Ok(AnalysisContext::new_with_selectivity(
+            target_boundaries,
+            1.0,
+        )),
+    }
+}
+
+/// If the `PropagationResult` indicates success, this function calculates the 
selectivity value by comparing the initial
+/// and final column boundaries. Following this, it constructs and returns a 
new `AnalysisContext`, with
+/// the updated parameters.
+fn shrink_boundaries(
+    expr: &Arc<dyn PhysicalExpr>,
+    mut graph: ExprIntervalGraph,
+    mut target_boundaries: Vec<ExprBoundaries>,
+    target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
+) -> Result<AnalysisContext> {
+    let initial_boundaries = target_boundaries.clone();
+    target_expr_and_indices.iter().for_each(|(expr, i)| {
+        if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+            if let Some(bound) = target_boundaries
+                .iter_mut()
+                .find(|bound| bound.column.eq(column))
+            {
+                bound.update_interval(graph.get_interval(*i))
+            };
+        }
+    });
+    let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
+    let (_, root_index) = graph_nodes.first().ok_or_else(|| {
+        DataFusionError::Internal("Error in constructing predicate 
graph".to_string())
+    })?;
+    let final_result = graph.get_interval(*root_index);
+
+    let selectivity = calculate_selectivity(
+        &final_result.lower.value,
+        &final_result.upper.value,
+        &target_boundaries,
+        &initial_boundaries,
+    )?;
+
+    if !(0.0..=1.0).contains(&selectivity) {
+        return Err(DataFusionError::Internal(format!(
+            "Selectivity is out of limit: {}",
+            selectivity
+        )));
+    }
+
+    Ok(AnalysisContext::new_with_selectivity(
+        target_boundaries,
+        selectivity,
+    ))
+}
+
+/// This function calculates the filter predicate's selectivity by comparing 
the initial and pruned column boundaries.
+/// Selectivity is defined as the ratio of rows in a table that satisfy the 
filter's predicate. An exact propagation result
+// at the root, i.e. `[true, true]` or `[false, false]`, leads to early exit 
(returning a selectivity value of either 1.0 or 0.0).
+// In such a case, `[true, true]` indicates that all data values satisfy the 
predicate (hence, selectivity is 1.0), and `[false, false]`
+// suggests that no data value meets the predicate (therefore, selectivity is 
0.0).
+fn calculate_selectivity(
+    lower_value: &ScalarValue,
+    upper_value: &ScalarValue,
+    target_boundaries: &[ExprBoundaries],
+    initial_boundaries: &[ExprBoundaries],
+) -> Result<f64> {
+    match (lower_value, upper_value) {
+        (ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(true))) 
=> Ok(1.0),
+        (ScalarValue::Boolean(Some(false)), ScalarValue::Boolean(Some(false))) 
=> Ok(0.0),
+        _ => {
+            // Since the intervals are assumed as uniform and we do not

Review Comment:
   I think another key assumption in this calculation is that the underlying 
column values that are filtered are not correlated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to