This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ac20bfd89 Join cardinality computation for cost-based nested join
optimizations (#3787)
ac20bfd89 is described below
commit ac20bfd89fed9201f8a02af91f216a66c8924d73
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Thu Oct 13 17:28:03 2022 +0300
Join cardinality computation for cost-based nested join optimizations
(#3787)
* Join cardinality computation for enabling cost-based nested join
optimizations
* Addressing review feedback
* Give up when we don't know the selectivity factor (no distinct count)
---
.../physical_optimizer/hash_build_probe_order.rs | 150 +++++++++-
datafusion/core/src/physical_plan/hash_join.rs | 10 +-
datafusion/core/src/physical_plan/join_utils.rs | 333 +++++++++++++++++++++
3 files changed, 490 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index b4b0deb02..66dfc6e69 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -196,7 +196,9 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
#[cfg(test)]
mod tests {
use crate::{
- physical_plan::{hash_join::PartitionMode, Statistics},
+ physical_plan::{
+ displayable, hash_join::PartitionMode, ColumnStatistics,
Statistics,
+ },
test::exec::StatisticsExec,
};
@@ -204,6 +206,7 @@ mod tests {
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::ScalarValue;
fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn
ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
@@ -226,6 +229,75 @@ mod tests {
(big, small)
}
+ /// Create a column statistics vector for a single column
+ /// that has the given min/max/distinct_count properties.
+ ///
+ /// Given min/max will be mapped to a [`ScalarValue`] if
+ /// they are not `None`.
+ fn create_column_stats(
+ min: Option<u64>,
+ max: Option<u64>,
+ distinct_count: Option<usize>,
+ ) -> Option<Vec<ColumnStatistics>> {
+ Some(vec![ColumnStatistics {
+ distinct_count,
+ min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+ max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+ ..Default::default()
+ }])
+ }
+
+ /// Returns three plans with statistics of (min, max, distinct_count)
+ /// * big 100K rows @ (0, 50k, 50k)
+ /// * medium 10K rows @ (1k, 5k, 1k)
+ /// * small 1K rows @ (0, 100k, 1k)
+ fn create_nested_with_min_max() -> (
+ Arc<dyn ExecutionPlan>,
+ Arc<dyn ExecutionPlan>,
+ Arc<dyn ExecutionPlan>,
+ ) {
+ let big = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Some(100_000),
+ column_statistics: create_column_stats(
+ Some(0),
+ Some(50_000),
+ Some(50_000),
+ ),
+ ..Default::default()
+ },
+ Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+ ));
+
+ let medium = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Some(10_000),
+ column_statistics: create_column_stats(
+ Some(1000),
+ Some(5000),
+ Some(1000),
+ ),
+ ..Default::default()
+ },
+ Schema::new(vec![Field::new("medium_col", DataType::Int32,
false)]),
+ ));
+
+ let small = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Some(1000),
+ column_statistics: create_column_stats(
+ Some(0),
+ Some(100_000),
+ Some(1000),
+ ),
+ ..Default::default()
+ },
+ Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+ ));
+
+ (big, medium, small)
+ }
+
#[tokio::test]
async fn test_join_with_swap() {
let (big, small) = create_big_and_small();
@@ -274,6 +346,82 @@ mod tests {
);
}
+ /// Compare the input plan with the plan after running the probe order
optimizer.
+ macro_rules! assert_optimized {
+ ($EXPECTED_LINES: expr, $PLAN: expr) => {
+ let expected_lines =
+ $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+ let optimized = HashBuildProbeOrder::new()
+ .optimize(Arc::new($PLAN), &SessionConfig::new())
+ .unwrap();
+
+ let plan = displayable(optimized.as_ref()).indent().to_string();
+ let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+ assert_eq!(
+ &expected_lines, &actual_lines,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected_lines, actual_lines
+ );
+ };
+ }
+
+ #[tokio::test]
+ async fn test_nested_join_swap() {
+ let (big, medium, small) = create_nested_with_min_max();
+
+ // Form the inner join: big JOIN small
+ let child_join = HashJoinExec::try_new(
+ Arc::clone(&big),
+ Arc::clone(&small),
+ vec![(
+ Column::new_with_schema("big_col", &big.schema()).unwrap(),
+ Column::new_with_schema("small_col", &small.schema()).unwrap(),
+ )],
+ None,
+ &JoinType::Inner,
+ PartitionMode::CollectLeft,
+ &false,
+ )
+ .unwrap();
+ let child_schema = child_join.schema();
+
+ // Form join tree `medium LEFT JOIN (big JOIN small)`
+ let join = HashJoinExec::try_new(
+ Arc::clone(&medium),
+ Arc::new(child_join),
+ vec![(
+ Column::new_with_schema("medium_col",
&medium.schema()).unwrap(),
+ Column::new_with_schema("small_col", &child_schema).unwrap(),
+ )],
+ None,
+ &JoinType::Left,
+ PartitionMode::CollectLeft,
+ &false,
+ )
+ .unwrap();
+
+ // Hash join uses the left side to build the hash table, and right
side to probe it. We want
+ // to keep left as small as possible, so if we can estimate (with a
reasonable margin of error)
+ // that the left side is smaller than the right side, we should swap
the sides.
+ //
+ // The first hash join's left is 'small' table (with 1000 rows), and
the second hash join's
+ // left is the F(small IJ big) which has an estimated cardinality of
2000 rows (vs medium which
+ // has an exact cardinality of 10_000 rows).
+ let expected = [
+ "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as
big_col, small_col@1 as small_col]",
+ " HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column {
name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
+ " ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as
small_col]",
+ " HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\",
index: 0 })]",
+ " StatisticsExec: col_count=1, row_count=Some(1000)",
+ " StatisticsExec: col_count=1, row_count=Some(100000)",
+ " StatisticsExec: col_count=1, row_count=Some(10000)",
+ ""
+ ];
+ assert_optimized!(expected, join);
+ }
+
#[tokio::test]
async fn test_join_no_swap() {
let (big, small) = create_big_and_small();
diff --git a/datafusion/core/src/physical_plan/hash_join.rs
b/datafusion/core/src/physical_plan/hash_join.rs
index 6d7fe5828..9c7f9d763 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -59,7 +59,8 @@ use super::{
coalesce_partitions::CoalescePartitionsExec,
expressions::PhysicalSortExpr,
join_utils::{
- build_join_schema, check_join_is_valid, ColumnIndex, JoinFilter,
JoinOn, JoinSide,
+ build_join_schema, check_join_is_valid, estimate_join_statistics,
ColumnIndex,
+ JoinFilter, JoinOn, JoinSide,
},
};
use super::{
@@ -385,7 +386,12 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size
of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
- Statistics::default()
+ estimate_join_statistics(
+ self.left.clone(),
+ self.right.clone(),
+ self.on.clone(),
+ &self.join_type,
+ )
}
}
diff --git a/datafusion/core/src/physical_plan/join_utils.rs
b/datafusion/core/src/physical_plan/join_utils.rs
index dc48d2aa8..780a5e96f 100644
--- a/datafusion/core/src/physical_plan/join_utils.rs
+++ b/datafusion/core/src/physical_plan/join_utils.rs
@@ -26,11 +26,14 @@ use datafusion_physical_expr::PhysicalExpr;
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use parking_lot::Mutex;
+use std::cmp::max;
use std::collections::HashSet;
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
+use super::{ColumnStatistics, ExecutionPlan, Statistics};
+
/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
/// Reference for JoinOn.
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
}
}
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+ pub num_rows: usize,
+ pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Estimate the statistics for the given join's output.
+pub(crate) fn estimate_join_statistics(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ on: JoinOn,
+ join_type: &JoinType,
+) -> Statistics {
+ let left_stats = left.statistics();
+ let right_stats = right.statistics();
+
+ let join_stats = estimate_join_cardinality(join_type, left_stats,
right_stats, &on);
+ let (num_rows, column_statistics) = match join_stats {
+ Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+ None => (None, None),
+ };
+ Statistics {
+ num_rows,
+ total_byte_size: None,
+ column_statistics,
+ is_exact: false,
+ }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn estimate_join_cardinality(
+ join_type: &JoinType,
+ left_stats: Statistics,
+ right_stats: Statistics,
+ on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+ match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full =>
{
+ let left_num_rows = left_stats.num_rows?;
+ let right_num_rows = right_stats.num_rows?;
+
+ // Take the left_col_stats and right_col_stats using the index
+ // obtained from index() method of the each element of 'on'.
+ let all_left_col_stats = left_stats.column_statistics?;
+ let all_right_col_stats = right_stats.column_statistics?;
+ let (left_col_stats, right_col_stats) = on
+ .iter()
+ .map(|(left, right)| {
+ (
+ all_left_col_stats[left.index()].clone(),
+ all_right_col_stats[right.index()].clone(),
+ )
+ })
+ .unzip::<_, _, Vec<_>, Vec<_>>();
+
+ let ij_cardinality = estimate_inner_join_cardinality(
+ left_num_rows,
+ right_num_rows,
+ left_col_stats,
+ right_col_stats,
+ )?;
+
+ // The cardinality for inner join can also be used to estimate
+ // the cardinality of left/right/full outer joins as long as it
+ // it is greater than the minimum cardinality constraints of these
+ // joins (so that we don't underestimate the cardinality).
+ let cardinality = match join_type {
+ JoinType::Inner => ij_cardinality,
+ JoinType::Left => max(ij_cardinality, left_num_rows),
+ JoinType::Right => max(ij_cardinality, right_num_rows),
+ JoinType::Full => {
+ max(ij_cardinality, left_num_rows)
+ + max(ij_cardinality, right_num_rows)
+ - ij_cardinality
+ }
+ _ => unreachable!(),
+ };
+
+ Some(PartialJoinStatistics {
+ num_rows: cardinality,
+ // We don't do anything specific here, just combine the
existing
+ // statistics which might yield subpar results (although it is
+ // true, esp regarding min/max). For a better estimation, we
need
+ // filter selectivity analysis first.
+ column_statistics: all_left_col_stats
+ .into_iter()
+ .chain(all_right_col_stats.into_iter())
+ .collect(),
+ })
+ }
+
+ JoinType::Semi => None,
+ JoinType::Anti => None,
+ }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn estimate_inner_join_cardinality(
+ left_num_rows: usize,
+ right_num_rows: usize,
+ left_col_stats: Vec<ColumnStatistics>,
+ right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+ // The algorithm here is partly based on the non-histogram selectivity
estimation
+ // from Spark's Catalyst optimizer.
+
+ let mut join_selectivity = None;
+ for (left_stat, right_stat) in
left_col_stats.iter().zip(right_col_stats.iter()) {
+ if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+ || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+ {
+ // If there is no overlap, then we can not accurately estimate
+ // the join cardinality. We could in theory use this information
+ // to point out the join will not produce any rows, but that would
+ // require some extra information (namely whether the statistics
are
+ // exact). For now, we just give up.
+ return None;
+ }
+
+ let max_distinct = max(left_stat.distinct_count,
right_stat.distinct_count);
+ if max_distinct > join_selectivity {
+ // Seems like there are a few implementations of this algorithm
that implement
+ // exponential decay for the selectivity (like Hive's Optiq
Optimizer). Needs
+ // further exploration.
+ join_selectivity = max_distinct;
+ }
+ }
+
+ // With the assumption that the smaller input's domain is generally
represented in the bigger
+ // input's domain, we can estimate the inner join's cardinality by taking
the cartesian product
+ // of the two inputs and normalizing it by the selectivity factor.
+ match join_selectivity {
+ Some(selectivity) if selectivity > 0 => {
+ Some((left_num_rows * right_num_rows) / selectivity)
+ }
+ // Since we don't have any information about the selectivity (which is
derived
+ // from the number of distinct rows information) we can give up here
for now.
+ // And let other passes handle this (otherwise we would need to
produce an
+ // overestimation using just the cartesian product).
+ _ => None,
+ }
+}
+
enum OnceFutState<T> {
Pending(OnceFutPending<T>),
Ready(Arc<Result<T>>),
@@ -347,6 +498,7 @@ impl<T: 'static> OnceFut<T> {
mod tests {
use super::*;
use arrow::datatypes::DataType;
+ use datafusion_common::ScalarValue;
fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) ->
Result<()> {
let left = left
@@ -461,4 +613,185 @@ mod tests {
Ok(())
}
+
+ fn create_stats(
+ num_rows: Option<usize>,
+ column_stats: Option<Vec<ColumnStatistics>>,
+ ) -> Statistics {
+ Statistics {
+ num_rows,
+ column_statistics: column_stats,
+ ..Default::default()
+ }
+ }
+
+ fn create_column_stats(
+ min: Option<u64>,
+ max: Option<u64>,
+ distinct_count: Option<usize>,
+ ) -> ColumnStatistics {
+ ColumnStatistics {
+ distinct_count,
+ min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+ max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+ ..Default::default()
+ }
+ }
+
+ type PartialStats = (usize, u64, u64, Option<usize>);
+
+ // This is mainly for validating the all edge cases of the estimation, but
+ // more advanced (and real world test cases) are below where we need some
control
+ // over the expected output (since it depends on join type to join type).
+ #[test]
+ fn test_inner_join_cardinality_single_column() -> Result<()> {
+ let cases: Vec<(PartialStats, PartialStats, Option<usize>)> = vec![
+ //
-----------------------------------------------------------------------------
+ // | left(rows, min, max, distinct), right(rows, min, max,
distinct), expected |
+ //
-----------------------------------------------------------------------------
+
+ // distinct(left) is None OR distinct(right) is None
+ //
+ // len(left) = len(right), len(left) * len(right)
+ ((10, 0, 10, None), (10, 0, 10, None), None),
+ // len(left) > len(right) OR len(left) < len(right), len(left) *
len(right)
+ ((10, 0, 10, None), (5, 0, 10, None), None),
+ ((5, 0, 10, None), (10, 0, 10, None), None),
+ ((10, 0, 10, None), (5, 0, 10, None), None),
+ ((5, 0, 10, None), (10, 0, 10, None), None),
+ // min(left) > max(right) OR min(right) > max(left), None
+ ((10, 0, 10, None), (10, 11, 20, None), None),
+ ((10, 11, 20, None), (10, 0, 10, None), None),
+ ((10, 5, 10, None), (10, 11, 3, None), None),
+ ((10, 10, 5, None), (10, 3, 7, None), None),
+ // distinct(left) is not None AND distinct(right) is not None
+ //
+ // len(left) = len(right), len(left) * len(right) /
max(distinct(left), distinct(right))
+ ((10, 0, 10, Some(5)), (10, 0, 10, Some(5)), Some(20)),
+ ((10, 0, 10, Some(10)), (10, 0, 10, Some(5)), Some(10)),
+ ((10, 0, 10, Some(5)), (10, 0, 10, Some(10)), Some(10)),
+ ];
+
+ for (left_info, right_info, expected_cardinality) in cases {
+ let left_num_rows = left_info.0;
+ let left_col_stats = vec![create_column_stats(
+ Some(left_info.1),
+ Some(left_info.2),
+ left_info.3,
+ )];
+
+ let right_num_rows = right_info.0;
+ let right_col_stats = vec![create_column_stats(
+ Some(right_info.1),
+ Some(right_info.2),
+ right_info.3,
+ )];
+
+ assert_eq!(
+ estimate_inner_join_cardinality(
+ left_num_rows,
+ right_num_rows,
+ left_col_stats.clone(),
+ right_col_stats.clone(),
+ ),
+ expected_cardinality
+ );
+
+ // We should also be able to use join_cardinality to get the same
results
+ let join_type = JoinType::Inner;
+ let join_on = vec![(Column::new("a", 0), Column::new("b", 0))];
+ let partial_join_stats = estimate_join_cardinality(
+ &join_type,
+ create_stats(Some(left_num_rows),
Some(left_col_stats.clone())),
+ create_stats(Some(right_num_rows),
Some(right_col_stats.clone())),
+ &join_on,
+ );
+
+ assert_eq!(
+ partial_join_stats.clone().map(|s| s.num_rows),
+ expected_cardinality
+ );
+ assert_eq!(
+ partial_join_stats.map(|s| s.column_statistics),
+ expected_cardinality.map(|_| [left_col_stats,
right_col_stats].concat())
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_inner_join_cardinality_multiple_column() -> Result<()> {
+ let left_col_stats = vec![
+ create_column_stats(Some(0), Some(100), Some(100)),
+ create_column_stats(Some(100), Some(500), Some(150)),
+ ];
+
+ let right_col_stats = vec![
+ create_column_stats(Some(0), Some(100), Some(50)),
+ create_column_stats(Some(100), Some(500), Some(200)),
+ ];
+
+ // We have statistics about 4 columns, where the highest distinct
+ // count is 200, so we are going to pick it.
+ assert_eq!(
+ estimate_inner_join_cardinality(400, 400, left_col_stats,
right_col_stats),
+ Some((400 * 400) / 200)
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_join_cardinality() -> Result<()> {
+ // Left table (rows=1000)
+ // a: min=0, max=100, distinct=100
+ // b: min=0, max=500, distinct=500
+ // x: min=1000, max=10000, distinct=None
+ //
+ // Right table (rows=2000)
+ // c: min=0, max=100, distinct=50
+ // d: min=0, max=2000, distinct=2500 (how? some inexact statistics)
+ // y: min=0, max=100, distinct=None
+ //
+ // Join on a=c, b=d (ignore x/y)
+ let cases = vec![
+ (JoinType::Inner, 800),
+ (JoinType::Left, 1000),
+ (JoinType::Right, 2000),
+ (JoinType::Full, 2200),
+ ];
+
+ let left_col_stats = vec![
+ create_column_stats(Some(0), Some(100), Some(100)),
+ create_column_stats(Some(0), Some(500), Some(500)),
+ create_column_stats(Some(1000), Some(10000), None),
+ ];
+
+ let right_col_stats = vec![
+ create_column_stats(Some(0), Some(100), Some(50)),
+ create_column_stats(Some(0), Some(2000), Some(2500)),
+ create_column_stats(Some(0), Some(100), None),
+ ];
+
+ for (join_type, expected_num_rows) in cases {
+ let join_on = vec![
+ (Column::new("a", 0), Column::new("c", 0)),
+ (Column::new("b", 1), Column::new("d", 1)),
+ ];
+
+ let partial_join_stats = estimate_join_cardinality(
+ &join_type,
+ create_stats(Some(1000), Some(left_col_stats.clone())),
+ create_stats(Some(2000), Some(right_col_stats.clone())),
+ &join_on,
+ )
+ .unwrap();
+ assert_eq!(partial_join_stats.num_rows, expected_num_rows);
+ assert_eq!(
+ partial_join_stats.column_statistics,
+ [left_col_stats.clone(), right_col_stats.clone()].concat()
+ );
+ }
+
+ Ok(())
+ }
}