alamb commented on code in PR #6982:
URL: https://github.com/apache/arrow-datafusion/pull/6982#discussion_r1267191022
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -451,6 +453,75 @@ impl Interval {
lower: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
upper: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
};
+
+ // Cardinality is the number of all points included by the interval,
considering its bounds.
Review Comment:
Something that might be worth considering in the long term that @tustvold
mentioned the other day is to vectorize these calculations -- at the moment
they are doin in the context of a single expression, but eventually if we want
to use this logic to prune large numbers of files / etc based on statistics it
may take too long
No change is needed here, I am just planting a seed of an idea in case this
was on your list too
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
ScalarValue::try_from_array(&cast_array, 0)
}
+/// This function calculates the final cardinality result by inspecting the
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+ lower_open: bool,
+ upper_open: bool,
+ diff: u64,
+) -> u64 {
+ match (lower_open, upper_open) {
+ (false, false) => diff + 1,
+ (true, true) => diff - 1,
+ _ => diff,
+ }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {
+ fn one() -> Self;
+}
+
+macro_rules! impl_OneTrait{
+ ($($m:ty),*) => {$( impl OneTrait for $m { fn one() -> Self { 1 as $m }
})*}
+}
+impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, f32, f64}
+
+/// This function either increments or decrements its argument, depending on
the `DIR` value. If `true`, it increments; otherwise it decrements the argument.
+fn increment_decrement<const DIR: bool, T: OneTrait + SubAssign + AddAssign>(
+ mut val: T,
+) -> T {
+ if DIR {
+ val.add_assign(T::one());
+ } else {
+ val.sub_assign(T::one());
+ }
+ val
+}
+
+/// This function returns the next/previous value depending on the `DIR` value.
+/// If `true`, it returns the next value; otherwise it returns the previous
value.
+fn get_next_value<const DIR: bool>(value: ScalarValue) -> ScalarValue {
+ use ScalarValue::*;
+ match value {
+ Float32(Some(val)) => {
+ let incremented_bits = increment_decrement::<DIR,
u32>(val.to_bits());
+ Float32(Some(f32::from_bits(incremented_bits)))
+ }
+ Float64(Some(val)) => {
+ let incremented_bits = increment_decrement::<DIR,
u64>(val.to_bits());
+ Float64(Some(f64::from_bits(incremented_bits)))
+ }
+ Int8(Some(val)) => Int8(Some(increment_decrement::<DIR, i8>(val))),
+ Int16(Some(val)) => Int16(Some(increment_decrement::<DIR, i16>(val))),
+ Int32(Some(val)) => Int32(Some(increment_decrement::<DIR, i32>(val))),
+ Int64(Some(val)) => Int64(Some(increment_decrement::<DIR, i64>(val))),
+ UInt8(Some(val)) => UInt8(Some(increment_decrement::<DIR, u8>(val))),
+ UInt16(Some(val)) => UInt16(Some(increment_decrement::<DIR,
u16>(val))),
+ UInt32(Some(val)) => UInt32(Some(increment_decrement::<DIR,
u32>(val))),
+ UInt64(Some(val)) => UInt64(Some(increment_decrement::<DIR,
u64>(val))),
+ _ => value, // Infinite bounds or unsupported datatypes
+ }
+}
+
+/// This function takes an interval, and if it has any open bound(s), it
+/// converts them to closed bound(s) preserving the interval endpoints.
+pub fn interval_with_closed_bounds(mut interval: Interval) -> Interval {
Review Comment:
This might be more elegant if it was a method on a `Interval` itelf
```rust
impl Interval
pub fn iwith_closed_bounds(mut self -> Interval {
...
}
}
```
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
ScalarValue::try_from_array(&cast_array, 0)
}
+/// This function calculates the final cardinality result by inspecting the
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+ lower_open: bool,
+ upper_open: bool,
+ diff: u64,
+) -> u64 {
+ match (lower_open, upper_open) {
+ (false, false) => diff + 1,
+ (true, true) => diff - 1,
+ _ => diff,
+ }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {
Review Comment:
Perhaps you could use ScalarValue::new_one here:
https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html#method.new_one
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -508,6 +580,83 @@ fn cast_scalar_value(
ScalarValue::try_from_array(&cast_array, 0)
}
+/// This function calculates the final cardinality result by inspecting the
endpoints of the interval.
+fn calculate_cardinality_based_on_bounds(
+ lower_open: bool,
+ upper_open: bool,
+ diff: u64,
+) -> u64 {
+ match (lower_open, upper_open) {
+ (false, false) => diff + 1,
+ (true, true) => diff - 1,
+ _ => diff,
+ }
+}
+
+trait OneTrait: Sized + std::ops::Add + std::ops::Sub {
+ fn one() -> Self;
+}
+
+macro_rules! impl_OneTrait{
+ ($($m:ty),*) => {$( impl OneTrait for $m { fn one() -> Self { 1 as $m }
})*}
+}
+impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, f32, f64}
+
+/// This function either increments or decrements its argument, depending on
the `DIR` value. If `true`, it increments; otherwise it decrements the argument.
+fn increment_decrement<const DIR: bool, T: OneTrait + SubAssign + AddAssign>(
+ mut val: T,
+) -> T {
+ if DIR {
+ val.add_assign(T::one());
+ } else {
+ val.sub_assign(T::one());
+ }
+ val
+}
+
+/// This function returns the next/previous value depending on the `DIR` value.
+/// If `true`, it returns the next value; otherwise it returns the previous
value.
+fn get_next_value<const DIR: bool>(value: ScalarValue) -> ScalarValue {
Review Comment:
Perhaps this would be more consistent if it were added to ScalarValue itself
```rust
impl ScalarValue {
fn next_value(&self) -> ScalarValue {
..
}
```
##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -672,4 +675,309 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_filter_statistics_multiple_columns() -> Result<()> {
Review Comment:
👍 thank you for these tests
Stylistically they might be easier to write if you could avoid the setup of
StatisticsExec and FilterExec (and just make the input / predicate / output.
However, that is just a minor suggestion
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -1172,4 +1321,70 @@ mod tests {
let upper = 1.5;
capture_mode_change_f32((lower, upper), true, true);
}
+
+ #[test]
+ fn test_cardinality_of_intervals() -> Result<()> {
+ // In IEEE 754 standard for floating-point arithmetic, if we keep the
sign and exponent fields same,
+ // we can represent 4503599627370496 different numbers by changing the
mantissa
+ // (4503599627370496 = 2^52, since there are 52 bits in mantissa, and
2^23 = 8388608 for f32).
+ let distinct_f64 = 4503599627370496;
+ let distinct_f32 = 8388608;
+ let intervals = [
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(0.25), false),
+ IntervalBound::new(ScalarValue::from(0.50), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(0.5), false),
+ IntervalBound::new(ScalarValue::from(1.0), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(1.0), false),
+ IntervalBound::new(ScalarValue::from(2.0), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(32.0), false),
+ IntervalBound::new(ScalarValue::from(64.0), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(-0.50), false),
+ IntervalBound::new(ScalarValue::from(-0.25), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(-32.0), false),
+ IntervalBound::new(ScalarValue::from(-16.0), true),
+ ),
+ ];
+ for interval in intervals {
+ assert_eq!(interval.cardinality()?, distinct_f64);
+ }
+
+ let intervals = [
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(0.25_f32), false),
+ IntervalBound::new(ScalarValue::from(0.50_f32), true),
+ ),
+ Interval::new(
+ IntervalBound::new(ScalarValue::from(-1_f32), false),
+ IntervalBound::new(ScalarValue::from(-0.5_f32), true),
+ ),
+ ];
+ for interval in intervals {
+ assert_eq!(interval.cardinality()?, distinct_f32);
+ }
+
+ let interval = Interval::new(
+ IntervalBound::new(ScalarValue::from(-0.0625), false),
+ IntervalBound::new(ScalarValue::from(0.0625), true),
+ );
+ assert_eq!(interval.cardinality()?, distinct_f64 * 2_048);
Review Comment:
While I understand the rationale behind this choice, I think in practice
this is not likely to provide much meaningful information -- to estimate
cardinality in such cases, one approach is to use distinct values / estimates
from the input -- like if the input's cardinality is 100, but the range is
`-0.0625` to `0.0625` then output cardinality of stable expressions is likely
to be bounded by 100
##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -672,4 +675,309 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_filter_statistics_multiple_columns() -> Result<()> {
Review Comment:
👍 thank you for these tests
Stylistically they might be easier to write if you could avoid the setup of
StatisticsExec and FilterExec (and just make the input / predicate / output.
However, that is just a minor suggestion
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -451,6 +453,75 @@ impl Interval {
lower: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
upper: IntervalBound::new(ScalarValue::Boolean(Some(true)), false),
};
+
+ // Cardinality is the number of all points included by the interval,
considering its bounds.
+ pub fn cardinality(&self) -> Result<u64> {
+ match self.get_datatype() {
+ Ok(data_type) if data_type.is_integer() => {
+ if let Some(diff) =
self.upper.value.distance(&self.lower.value) {
+ Ok(calculate_cardinality_based_on_bounds(
+ self.lower.open,
+ self.upper.open,
+ diff as u64,
+ ))
+ } else {
+ Err(DataFusionError::Execution(format!(
+ "Cardinality cannot be calculated for {:?}",
+ self
+ )))
+ }
+ }
+ // Since the floating-point numbers are ordered in the same order
as their binary representation,
+ // we can consider their binary representations as "indices" and
subtract them.
+ //
https://stackoverflow.com/questions/8875064/how-many-distinct-floating-point-numbers-in-a-specific-range
+ Ok(data_type) if data_type.is_floating() => {
Review Comment:
Not sure it matters but `is_floating` also includes `Float16` but the code
below only handles Float32 / Float64
##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -152,124 +283,82 @@ pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
/// the boundaries for all known columns.
#[derive(Clone, Debug, PartialEq)]
pub struct AnalysisContext {
- /// A list of known column boundaries, ordered by the index
- /// of the column in the current schema.
- pub column_boundaries: Vec<Option<ExprBoundaries>>,
- // Result of the current analysis.
- pub boundaries: Option<ExprBoundaries>,
+ // A list of known column boundaries, ordered by the index
+ // of the column in the current schema.
+ pub boundaries: Option<Vec<ExprBoundaries>>,
+ /// The estimated percentage of rows that this expression would select, if
+ /// it were to be used as a boolean predicate on a filter. The value will
be
+ /// between 0.0 (selects nothing) and 1.0 (selects everything).
+ pub selectivity: Option<f64>,
}
impl AnalysisContext {
- pub fn new(
- input_schema: &Schema,
- column_boundaries: Vec<Option<ExprBoundaries>>,
- ) -> Self {
- assert_eq!(input_schema.fields().len(), column_boundaries.len());
+ pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
Self {
- column_boundaries,
- boundaries: None,
+ boundaries: Some(boundaries),
+ selectivity: None,
}
}
- /// Create a new analysis context from column statistics.
- pub fn from_statistics(input_schema: &Schema, statistics: &Statistics) ->
Self {
- // Even if the underlying statistics object doesn't have any column
level statistics,
- // we can still create an analysis context with the same number of
columns and see whether
- // we can infer it during the way.
- let column_boundaries = match &statistics.column_statistics {
- Some(columns) => columns
- .iter()
- .map(ExprBoundaries::from_column)
- .collect::<Vec<_>>(),
- None => vec![None; input_schema.fields().len()],
- };
- Self::new(input_schema, column_boundaries)
- }
-
- pub fn boundaries(&self) -> Option<&ExprBoundaries> {
- self.boundaries.as_ref()
- }
-
- /// Set the result of the current analysis.
- pub fn with_boundaries(mut self, result: Option<ExprBoundaries>) -> Self {
- self.boundaries = result;
- self
+ pub fn new_with_selectivity(
+ boundaries: Vec<ExprBoundaries>,
+ selectivity: f64,
+ ) -> Self {
+ Self {
+ boundaries: Some(boundaries),
+ selectivity: Some(selectivity),
+ }
Review Comment:
An alternate form that might be more pleasing to the eye might be
```suggestion
pub fn with_selectivity(
mut self,
selectivity: f64,
) -> Self {
self.selectivity = Some(selectivity);
self
}
```
then instead of
```rust
Ok(AnalysisContext::new_with_selectivity(
target_boundaries,
selectivity,
))
```
you could write
```rust
Ok(AnalysisContext::new(target_boundaries)
.with_selectivity(selectivity)
)
```
##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -139,6 +133,143 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug +
PartialEq<dyn Any> {
fn dyn_hash(&self, _state: &mut dyn Hasher);
}
+/// Attempts to refine column boundaries and compute a selectivity value.
+///
+/// The function accepts boundaries of the input columns in the `context`
parameter.
+/// It then tries to tighten these boundaries based on the provided `expr`.
+/// The resulting selectivity value is calculated by comparing the initial and
final boundaries.
+/// The computation assumes that the data within the column is uniformly
distributed and not sorted.
+///
+/// # Arguments
+///
+/// * `context` - The context holding input column boundaries.
+/// * `expr` - The expression used to shrink the column boundaries.
+///
+/// # Returns
+///
+/// * `AnalysisContext` constructed by pruned boundaries and a selectivity
value.
+pub fn analyze(
+ expr: &Arc<dyn PhysicalExpr>,
+ context: AnalysisContext,
+) -> Result<AnalysisContext> {
+ let target_boundaries = context.boundaries.ok_or_else(|| {
+ DataFusionError::Internal("No column exists at the input to
filter".to_string())
+ })?;
+
+ let mut graph = ExprIntervalGraph::try_new(expr.clone())?;
+
+ let columns: Vec<Arc<dyn PhysicalExpr>> = collect_columns(expr)
+ .into_iter()
+ .map(|c| Arc::new(c) as Arc<dyn PhysicalExpr>)
+ .collect();
+
+ let target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)> =
+ graph.gather_node_indices(columns.as_slice());
+
+ let mut target_indices_and_boundaries: Vec<(usize, Interval)> =
+ target_expr_and_indices
+ .iter()
+ .filter_map(|(expr, i)| {
+ target_boundaries.iter().find_map(|bound| {
+ expr.as_any()
+ .downcast_ref::<Column>()
+ .filter(|expr_column| bound.column.eq(*expr_column))
+ .map(|_| (*i, bound.interval.clone()))
+ })
+ })
+ .collect();
+
+ match graph.update_ranges(&mut target_indices_and_boundaries)? {
+ PropagationResult::Success => {
+ shrink_boundaries(expr, graph, target_boundaries,
target_expr_and_indices)
+ }
+ PropagationResult::Infeasible =>
Ok(AnalysisContext::new_with_selectivity(
+ target_boundaries,
+ 0.0,
+ )),
+ PropagationResult::CannotPropagate =>
Ok(AnalysisContext::new_with_selectivity(
+ target_boundaries,
+ 1.0,
+ )),
+ }
+}
+
+/// If the `PropagationResult` indicates success, this function calculates the
selectivity value by comparing the initial
+/// and final column boundaries. Following this, it constructs and returns a
new `AnalysisContext`, with
+/// the updated parameters.
+fn shrink_boundaries(
+ expr: &Arc<dyn PhysicalExpr>,
+ mut graph: ExprIntervalGraph,
+ mut target_boundaries: Vec<ExprBoundaries>,
+ target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
+) -> Result<AnalysisContext> {
+ let initial_boundaries = target_boundaries.clone();
+ target_expr_and_indices.iter().for_each(|(expr, i)| {
+ if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ if let Some(bound) = target_boundaries
+ .iter_mut()
+ .find(|bound| bound.column.eq(column))
+ {
+ bound.update_interval(graph.get_interval(*i))
+ };
+ }
+ });
+ let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
+ let (_, root_index) = graph_nodes.first().ok_or_else(|| {
+ DataFusionError::Internal("Error in constructing predicate
graph".to_string())
+ })?;
+ let final_result = graph.get_interval(*root_index);
+
+ let selectivity = calculate_selectivity(
+ &final_result.lower.value,
+ &final_result.upper.value,
+ &target_boundaries,
+ &initial_boundaries,
+ )?;
+
+ if !(0.0..=1.0).contains(&selectivity) {
+ return Err(DataFusionError::Internal(format!(
+ "Selectivity is out of limit: {}",
+ selectivity
+ )));
+ }
+
+ Ok(AnalysisContext::new_with_selectivity(
+ target_boundaries,
+ selectivity,
+ ))
+}
+
+/// This function calculates the filter predicate's selectivity by comparing
the initial and pruned column boundaries.
+/// Selectivity is defined as the ratio of rows in a table that satisfy the
filter's predicate. An exact propagation result
+// at the root, i.e. `[true, true]` or `[false, false]`, leads to early exit
(returning a selectivity value of either 1.0 or 0.0).
+// In such a case, `[true, true]` indicates that all data values satisfy the
predicate (hence, selectivity is 1.0), and `[false, false]`
+// suggests that no data value meets the predicate (therefore, selectivity is
0.0).
+fn calculate_selectivity(
+ lower_value: &ScalarValue,
+ upper_value: &ScalarValue,
+ target_boundaries: &[ExprBoundaries],
+ initial_boundaries: &[ExprBoundaries],
+) -> Result<f64> {
+ match (lower_value, upper_value) {
+ (ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(true)))
=> Ok(1.0),
+ (ScalarValue::Boolean(Some(false)), ScalarValue::Boolean(Some(false)))
=> Ok(0.0),
+ _ => {
+ // Since the intervals are assumed as uniform and we do not
Review Comment:
I think another key assumption in this calculation is that the underlying
column values that are filtered are not correlated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]