This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ac20bfd89 Join cardinality computation for cost-based nested join 
optimizations (#3787)
ac20bfd89 is described below

commit ac20bfd89fed9201f8a02af91f216a66c8924d73
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Thu Oct 13 17:28:03 2022 +0300

    Join cardinality computation for cost-based nested join optimizations 
(#3787)
    
    * Join cardinality computation for enabling cost-based nested join 
optimizations
    
    * Addressing review feedback
    
    * Give up when we don't know the selectivity factor (no distinct count)
---
 .../physical_optimizer/hash_build_probe_order.rs   | 150 +++++++++-
 datafusion/core/src/physical_plan/hash_join.rs     |  10 +-
 datafusion/core/src/physical_plan/join_utils.rs    | 333 +++++++++++++++++++++
 3 files changed, 490 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs 
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index b4b0deb02..66dfc6e69 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -196,7 +196,9 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
 #[cfg(test)]
 mod tests {
     use crate::{
-        physical_plan::{hash_join::PartitionMode, Statistics},
+        physical_plan::{
+            displayable, hash_join::PartitionMode, ColumnStatistics, 
Statistics,
+        },
         test::exec::StatisticsExec,
     };
 
@@ -204,6 +206,7 @@ mod tests {
     use std::sync::Arc;
 
     use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
 
     fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn 
ExecutionPlan>) {
         let big = Arc::new(StatisticsExec::new(
@@ -226,6 +229,75 @@ mod tests {
         (big, small)
     }
 
+    /// Create a column statistics vector for a single column
+    /// that has the given min/max/distinct_count properties.
+    ///
+    /// Given min/max will be mapped to a [`ScalarValue`] if
+    /// they are not `None`.
+    fn create_column_stats(
+        min: Option<u64>,
+        max: Option<u64>,
+        distinct_count: Option<usize>,
+    ) -> Option<Vec<ColumnStatistics>> {
+        Some(vec![ColumnStatistics {
+            distinct_count,
+            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+            ..Default::default()
+        }])
+    }
+
+    /// Returns three plans with statistics of (min, max, distinct_count)
+    /// * big 100K rows @ (0, 50k, 50k)
+    /// * medium 10K rows @ (1k, 5k, 1k)
+    /// * small 1K rows @ (0, 100k, 1k)
+    fn create_nested_with_min_max() -> (
+        Arc<dyn ExecutionPlan>,
+        Arc<dyn ExecutionPlan>,
+        Arc<dyn ExecutionPlan>,
+    ) {
+        let big = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100_000),
+                column_statistics: create_column_stats(
+                    Some(0),
+                    Some(50_000),
+                    Some(50_000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+        ));
+
+        let medium = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10_000),
+                column_statistics: create_column_stats(
+                    Some(1000),
+                    Some(5000),
+                    Some(1000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("medium_col", DataType::Int32, 
false)]),
+        ));
+
+        let small = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(1000),
+                column_statistics: create_column_stats(
+                    Some(0),
+                    Some(100_000),
+                    Some(1000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+        ));
+
+        (big, medium, small)
+    }
+
     #[tokio::test]
     async fn test_join_with_swap() {
         let (big, small) = create_big_and_small();
@@ -274,6 +346,82 @@ mod tests {
         );
     }
 
+    /// Compare the input plan with the plan after running the probe order 
optimizer.
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines =
+                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+            let optimized = HashBuildProbeOrder::new()
+                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .unwrap();
+
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
+    #[tokio::test]
+    async fn test_nested_join_swap() {
+        let (big, medium, small) = create_nested_with_min_max();
+
+        // Form the inner join: big JOIN small
+        let child_join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+        let child_schema = child_join.schema();
+
+        // Form join tree `medium LEFT JOIN (big JOIN small)`
+        let join = HashJoinExec::try_new(
+            Arc::clone(&medium),
+            Arc::new(child_join),
+            vec![(
+                Column::new_with_schema("medium_col", 
&medium.schema()).unwrap(),
+                Column::new_with_schema("small_col", &child_schema).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        // Hash join uses the left side to build the hash table, and right 
side to probe it. We want
+        // to keep left as small as possible, so if we can estimate (with a 
reasonable margin of error)
+        // that the left side is smaller than the right side, we should swap 
the sides.
+        //
+        // The first hash join's left is 'small' table (with 1000 rows), and 
the second hash join's
+        // left is the F(small IJ big) which has an estimated cardinality of 
2000 rows (vs medium which
+        // has an exact cardinality of 10_000 rows).
+        let expected = [
+            "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as 
big_col, small_col@1 as small_col]",
+            "  HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { 
name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
+            "    ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as 
small_col]",
+            "      HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", 
index: 0 })]",
+            "        StatisticsExec: col_count=1, row_count=Some(1000)",
+            "        StatisticsExec: col_count=1, row_count=Some(100000)",
+            "    StatisticsExec: col_count=1, row_count=Some(10000)",
+            ""
+        ];
+        assert_optimized!(expected, join);
+    }
+
     #[tokio::test]
     async fn test_join_no_swap() {
         let (big, small) = create_big_and_small();
diff --git a/datafusion/core/src/physical_plan/hash_join.rs 
b/datafusion/core/src/physical_plan/hash_join.rs
index 6d7fe5828..9c7f9d763 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -59,7 +59,8 @@ use super::{
     coalesce_partitions::CoalescePartitionsExec,
     expressions::PhysicalSortExpr,
     join_utils::{
-        build_join_schema, check_join_is_valid, ColumnIndex, JoinFilter, 
JoinOn, JoinSide,
+        build_join_schema, check_join_is_valid, estimate_join_statistics, 
ColumnIndex,
+        JoinFilter, JoinOn, JoinSide,
     },
 };
 use super::{
@@ -385,7 +386,12 @@ impl ExecutionPlan for HashJoinExec {
         // TODO stats: it is not possible in general to know the output size 
of joins
         // There are some special cases though, for example:
         // - `A LEFT JOIN B ON A.col=B.col` with 
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
-        Statistics::default()
+        estimate_join_statistics(
+            self.left.clone(),
+            self.right.clone(),
+            self.on.clone(),
+            &self.join_type,
+        )
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/join_utils.rs 
b/datafusion/core/src/physical_plan/join_utils.rs
index dc48d2aa8..780a5e96f 100644
--- a/datafusion/core/src/physical_plan/join_utils.rs
+++ b/datafusion/core/src/physical_plan/join_utils.rs
@@ -26,11 +26,14 @@ use datafusion_physical_expr::PhysicalExpr;
 use futures::future::{BoxFuture, Shared};
 use futures::{ready, FutureExt};
 use parking_lot::Mutex;
+use std::cmp::max;
 use std::collections::HashSet;
 use std::future::Future;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use super::{ColumnStatistics, ExecutionPlan, Statistics};
+
 /// The on clause of the join, as vector of (left, right) columns.
 pub type JoinOn = Vec<(Column, Column)>;
 /// Reference for JoinOn.
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Estimate the statistics for the given join's output.
+pub(crate) fn estimate_join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = estimate_join_cardinality(join_type, left_stats, 
right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn estimate_join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = estimate_inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn estimate_inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let max_distinct = max(left_stat.distinct_count, 
right_stat.distinct_count);
+        if max_distinct > join_selectivity {
+            // Seems like there are a few implementations of this algorithm 
that implement
+            // exponential decay for the selectivity (like Hive's Optiq 
Optimizer). Needs
+            // further exploration.
+            join_selectivity = max_distinct;
+        }
+    }
+
+    // With the assumption that the smaller input's domain is generally 
represented in the bigger
+    // input's domain, we can estimate the inner join's cardinality by taking 
the cartesian product
+    // of the two inputs and normalizing it by the selectivity factor.
+    match join_selectivity {
+        Some(selectivity) if selectivity > 0 => {
+            Some((left_num_rows * right_num_rows) / selectivity)
+        }
+        // Since we don't have any information about the selectivity (which is 
derived
+        // from the number of distinct rows information) we can give up here 
for now.
+        // And let other passes handle this (otherwise we would need to 
produce an
+        // overestimation using just the cartesian product).
+        _ => None,
+    }
+}
+
 enum OnceFutState<T> {
     Pending(OnceFutPending<T>),
     Ready(Arc<Result<T>>),
@@ -347,6 +498,7 @@ impl<T: 'static> OnceFut<T> {
 mod tests {
     use super::*;
     use arrow::datatypes::DataType;
+    use datafusion_common::ScalarValue;
 
     fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> 
Result<()> {
         let left = left
@@ -461,4 +613,185 @@ mod tests {
 
         Ok(())
     }
+
+    fn create_stats(
+        num_rows: Option<usize>,
+        column_stats: Option<Vec<ColumnStatistics>>,
+    ) -> Statistics {
+        Statistics {
+            num_rows,
+            column_statistics: column_stats,
+            ..Default::default()
+        }
+    }
+
+    fn create_column_stats(
+        min: Option<u64>,
+        max: Option<u64>,
+        distinct_count: Option<usize>,
+    ) -> ColumnStatistics {
+        ColumnStatistics {
+            distinct_count,
+            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+            ..Default::default()
+        }
+    }
+
+    type PartialStats = (usize, u64, u64, Option<usize>);
+
+    // This is mainly for validating the all edge cases of the estimation, but
+    // more advanced (and real world test cases) are below where we need some 
control
+    // over the expected output (since it depends on join type to join type).
+    #[test]
+    fn test_inner_join_cardinality_single_column() -> Result<()> {
+        let cases: Vec<(PartialStats, PartialStats, Option<usize>)> = vec![
+            // 
-----------------------------------------------------------------------------
+            // | left(rows, min, max, distinct), right(rows, min, max, 
distinct), expected |
+            // 
-----------------------------------------------------------------------------
+
+            // distinct(left) is None OR distinct(right) is None
+            //
+            // len(left) = len(right), len(left) * len(right)
+            ((10, 0, 10, None), (10, 0, 10, None), None),
+            // len(left) > len(right) OR len(left) < len(right), len(left) * 
len(right)
+            ((10, 0, 10, None), (5, 0, 10, None), None),
+            ((5, 0, 10, None), (10, 0, 10, None), None),
+            ((10, 0, 10, None), (5, 0, 10, None), None),
+            ((5, 0, 10, None), (10, 0, 10, None), None),
+            // min(left) > max(right) OR min(right) > max(left), None
+            ((10, 0, 10, None), (10, 11, 20, None), None),
+            ((10, 11, 20, None), (10, 0, 10, None), None),
+            ((10, 5, 10, None), (10, 11, 3, None), None),
+            ((10, 10, 5, None), (10, 3, 7, None), None),
+            // distinct(left) is not None AND distinct(right) is not None
+            //
+            // len(left) = len(right), len(left) * len(right) / 
max(distinct(left), distinct(right))
+            ((10, 0, 10, Some(5)), (10, 0, 10, Some(5)), Some(20)),
+            ((10, 0, 10, Some(10)), (10, 0, 10, Some(5)), Some(10)),
+            ((10, 0, 10, Some(5)), (10, 0, 10, Some(10)), Some(10)),
+        ];
+
+        for (left_info, right_info, expected_cardinality) in cases {
+            let left_num_rows = left_info.0;
+            let left_col_stats = vec![create_column_stats(
+                Some(left_info.1),
+                Some(left_info.2),
+                left_info.3,
+            )];
+
+            let right_num_rows = right_info.0;
+            let right_col_stats = vec![create_column_stats(
+                Some(right_info.1),
+                Some(right_info.2),
+                right_info.3,
+            )];
+
+            assert_eq!(
+                estimate_inner_join_cardinality(
+                    left_num_rows,
+                    right_num_rows,
+                    left_col_stats.clone(),
+                    right_col_stats.clone(),
+                ),
+                expected_cardinality
+            );
+
+            // We should also be able to use join_cardinality to get the same 
results
+            let join_type = JoinType::Inner;
+            let join_on = vec![(Column::new("a", 0), Column::new("b", 0))];
+            let partial_join_stats = estimate_join_cardinality(
+                &join_type,
+                create_stats(Some(left_num_rows), 
Some(left_col_stats.clone())),
+                create_stats(Some(right_num_rows), 
Some(right_col_stats.clone())),
+                &join_on,
+            );
+
+            assert_eq!(
+                partial_join_stats.clone().map(|s| s.num_rows),
+                expected_cardinality
+            );
+            assert_eq!(
+                partial_join_stats.map(|s| s.column_statistics),
+                expected_cardinality.map(|_| [left_col_stats, 
right_col_stats].concat())
+            );
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_inner_join_cardinality_multiple_column() -> Result<()> {
+        let left_col_stats = vec![
+            create_column_stats(Some(0), Some(100), Some(100)),
+            create_column_stats(Some(100), Some(500), Some(150)),
+        ];
+
+        let right_col_stats = vec![
+            create_column_stats(Some(0), Some(100), Some(50)),
+            create_column_stats(Some(100), Some(500), Some(200)),
+        ];
+
+        // We have statistics about 4 columns, where the highest distinct
+        // count is 200, so we are going to pick it.
+        assert_eq!(
+            estimate_inner_join_cardinality(400, 400, left_col_stats, 
right_col_stats),
+            Some((400 * 400) / 200)
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_join_cardinality() -> Result<()> {
+        // Left table (rows=1000)
+        //   a: min=0, max=100, distinct=100
+        //   b: min=0, max=500, distinct=500
+        //   x: min=1000, max=10000, distinct=None
+        //
+        // Right table (rows=2000)
+        //   c: min=0, max=100, distinct=50
+        //   d: min=0, max=2000, distinct=2500 (how? some inexact statistics)
+        //   y: min=0, max=100, distinct=None
+        //
+        // Join on a=c, b=d (ignore x/y)
+        let cases = vec![
+            (JoinType::Inner, 800),
+            (JoinType::Left, 1000),
+            (JoinType::Right, 2000),
+            (JoinType::Full, 2200),
+        ];
+
+        let left_col_stats = vec![
+            create_column_stats(Some(0), Some(100), Some(100)),
+            create_column_stats(Some(0), Some(500), Some(500)),
+            create_column_stats(Some(1000), Some(10000), None),
+        ];
+
+        let right_col_stats = vec![
+            create_column_stats(Some(0), Some(100), Some(50)),
+            create_column_stats(Some(0), Some(2000), Some(2500)),
+            create_column_stats(Some(0), Some(100), None),
+        ];
+
+        for (join_type, expected_num_rows) in cases {
+            let join_on = vec![
+                (Column::new("a", 0), Column::new("c", 0)),
+                (Column::new("b", 1), Column::new("d", 1)),
+            ];
+
+            let partial_join_stats = estimate_join_cardinality(
+                &join_type,
+                create_stats(Some(1000), Some(left_col_stats.clone())),
+                create_stats(Some(2000), Some(right_col_stats.clone())),
+                &join_on,
+            )
+            .unwrap();
+            assert_eq!(partial_join_stats.num_rows, expected_num_rows);
+            assert_eq!(
+                partial_join_stats.column_statistics,
+                [left_col_stats.clone(), right_col_stats.clone()].concat()
+            );
+        }
+
+        Ok(())
+    }
 }

Reply via email to