alamb commented on code in PR #3787:
URL: https://github.com/apache/arrow-datafusion/pull/3787#discussion_r993710273


##########
datafusion/core/src/physical_optimizer/hash_build_probe_order.rs:
##########
@@ -226,6 +229,66 @@ mod tests {
         (big, small)
     }
 
+    fn create_column_stats(
+        min: Option<u64>,
+        max: Option<u64>,
+        distinct_count: Option<usize>,
+    ) -> Option<Vec<ColumnStatistics>> {
+        Some(vec![ColumnStatistics {
+            distinct_count,
+            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+            ..Default::default()
+        }])
+    }
+
+    fn create_nested_with_min_max() -> (

Review Comment:
   ```suggestion
       /// Returns three plans with statistics of (min, max, distinct_count)
       /// * big 100K rows @ (0, 50k, 50k)
       /// * medium 10K rows @1 (1k, 5, 1k)
       /// * small 1K rows @ (0, 10k, 1k)
       fn create_nested_with_min_max() -> (
   ```



##########
datafusion/core/src/physical_optimizer/hash_build_probe_order.rs:
##########
@@ -274,6 +337,76 @@ mod tests {
         );
     }
 
+    /// Compare the input plan with the plan after running the probe order 
optimizer.
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines =
+                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+            let optimized = HashBuildProbeOrder::new()
+                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .unwrap();
+
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
+    #[tokio::test]
+    async fn test_nested_join_swap() {
+        let (big, medium, small) = create_nested_with_min_max();
+
+        let child_join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+        let child_schema = child_join.schema();
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&medium),
+            Arc::new(child_join),
+            vec![(
+                Column::new_with_schema("medium_col", 
&medium.schema()).unwrap(),
+                Column::new_with_schema("small_col", &child_schema).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        // The first hash join's left is 'small' table (with 1000 rows), and 
the second hash join's
+        // left is the F(small IJ big) which has an estimated cardinality of 
2000 rows (vs medium which
+        // has an exact cardinality of 10_000 rows).
+        let expected = [
+            "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as 
big_col, small_col@1 as small_col]",
+            "  HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { 
name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
+            "    ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as 
small_col]",
+            "      HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", 
index: 0 })]",

Review Comment:
   This plan has the small relation on the "left" side -- which is the build 
side 
https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/hash_join.rs#L237-L240
 👍 
   
   (I am used to query plans where the build side is the "right" side so this 
took me some time to convince myself that it is right)



##########
datafusion/core/src/physical_optimizer/hash_build_probe_order.rs:
##########
@@ -274,6 +337,76 @@ mod tests {
         );
     }
 
+    /// Compare the input plan with the plan after running the probe order 
optimizer.
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines =
+                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+            let optimized = HashBuildProbeOrder::new()
+                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .unwrap();
+
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
+    #[tokio::test]
+    async fn test_nested_join_swap() {
+        let (big, medium, small) = create_nested_with_min_max();
+
+        let child_join = HashJoinExec::try_new(

Review Comment:
   ```suggestion
           // Form join tree `medium LEFT JOIN (big JOIN small)`
           let child_join = HashJoinExec::try_new(
   ```



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -461,4 +613,185 @@ mod tests {
 
         Ok(())
     }
+
+    fn create_stats(
+        num_rows: Option<usize>,
+        column_stats: Option<Vec<ColumnStatistics>>,
+    ) -> Statistics {
+        Statistics {
+            num_rows,
+            column_statistics: column_stats,
+            ..Default::default()
+        }
+    }
+
+    fn create_column_stats(
+        min: Option<u64>,
+        max: Option<u64>,
+        distinct_count: Option<usize>,
+    ) -> ColumnStatistics {
+        ColumnStatistics {
+            distinct_count,
+            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+            ..Default::default()
+        }
+    }
+
+    type PartialStats = (usize, u64, u64, Option<usize>);
+
+    // This is mainly for validating the all edge cases of the calculation, but
+    // more advanced (and real world test cases) are below where we need some 
control
+    // over the expected output (since it depends on join type to join type).
+    #[test]
+    fn test_inner_join_cardinality_single_column() -> Result<()> {
+        let cases: Vec<(PartialStats, PartialStats, Option<usize>)> = vec![

Review Comment:
   👨‍🍳 👌  -- very nice



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.

Review Comment:
   ```suggestion
   /// Estimate the statistics for the given join's output.
   ```



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let col_selectivity = max(left_stat.distinct_count, 
right_stat.distinct_count);

Review Comment:
   I found the term "col_selectivity" confusing here (as it is really the 
maximum distinct count of any of the join columns). Could we rename 
`col_selectivity` perhaps to `max_distinct` or something?



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),

Review Comment:
   this is good (to use max of the inputs). 



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(

Review Comment:
   Sorry -- I am being pedantic and would suggest naming these functions in a 
way to make it super clear they are estimates. 😆 
   
   ```suggestion
   fn estimate_join_cardinality(
   ```



##########
datafusion/core/src/physical_optimizer/hash_build_probe_order.rs:
##########
@@ -274,6 +337,76 @@ mod tests {
         );
     }
 
+    /// Compare the input plan with the plan after running the probe order 
optimizer.
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines =
+                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+            let optimized = HashBuildProbeOrder::new()
+                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .unwrap();
+
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
+    #[tokio::test]
+    async fn test_nested_join_swap() {
+        let (big, medium, small) = create_nested_with_min_max();
+
+        let child_join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+        let child_schema = child_join.schema();
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&medium),
+            Arc::new(child_join),
+            vec![(
+                Column::new_with_schema("medium_col", 
&medium.schema()).unwrap(),
+                Column::new_with_schema("small_col", &child_schema).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        // The first hash join's left is 'small' table (with 1000 rows), and 
the second hash join's
+        // left is the F(small IJ big) which has an estimated cardinality of 
2000 rows (vs medium which
+        // has an exact cardinality of 10_000 rows).
+        let expected = [
+            "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as 
big_col, small_col@1 as small_col]",

Review Comment:
   👍  plan looks good



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let col_selectivity = max(left_stat.distinct_count, 
right_stat.distinct_count);
+        if col_selectivity > join_selectivity {
+            // Seems like there are a few implementations of this algorithm 
that implement
+            // exponential decay for the selectivity (like Hive's Optiq 
Optimizer). Needs
+            // further exploration.
+            join_selectivity = col_selectivity;
+        }
+    }
+
+    // With the assumption that the smaller input's domain is generally 
represented in the bigger
+    // input's domain, we can calculate the inner join's cardinality by taking 
the cartesian product
+    // of the two inputs and normalizing it by the selectivity factor.
+    let cardinality = match join_selectivity {
+        Some(selectivity) if selectivity > 0 => {
+            (left_num_rows * right_num_rows) / selectivity
+        }

Review Comment:
   When I went over some boundary conditions I think this model is fairly 
simplistic but reasonable
   
   * 1:1 Join (where there is exactly one or zero matches for each input on the 
non probe side) with a single predicate column -- model will assume the join 
does not filter any rows 
   
   



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let col_selectivity = max(left_stat.distinct_count, 
right_stat.distinct_count);
+        if col_selectivity > join_selectivity {
+            // Seems like there are a few implementations of this algorithm 
that implement
+            // exponential decay for the selectivity (like Hive's Optiq 
Optimizer). Needs
+            // further exploration.
+            join_selectivity = col_selectivity;
+        }
+    }
+
+    // With the assumption that the smaller input's domain is generally 
represented in the bigger
+    // input's domain, we can calculate the inner join's cardinality by taking 
the cartesian product
+    // of the two inputs and normalizing it by the selectivity factor.
+    let cardinality = match join_selectivity {
+        Some(selectivity) if selectivity > 0 => {
+            (left_num_rows * right_num_rows) / selectivity
+        }
+        // Since we don't have any information about the selectivity,
+        // we can only assume that the join will produce the cartesian
+        // product.
+        _ => left_num_rows * right_num_rows,

Review Comment:
   For what it is worth, in practice joins where the cost matters are almost 
*never* cartesian products (because if they were for large inputs the query 
would never finish and if the inputs are small it doesn't really matter where 
in the join tree they go. 



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let col_selectivity = max(left_stat.distinct_count, 
right_stat.distinct_count);
+        if col_selectivity > join_selectivity {
+            // Seems like there are a few implementations of this algorithm 
that implement
+            // exponential decay for the selectivity (like Hive's Optiq 
Optimizer). Needs
+            // further exploration.
+            join_selectivity = col_selectivity;

Review Comment:
   If the join columns aren't correlated then a better estimate might be
   
   ```rust
   join_selectivity = col_selectivity * join_selectivity;
   ```



##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Calculate the statistics for the given join's output.
+pub(crate) fn join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to