isidentical commented on code in PR #3787:
URL: https://github.com/apache/arrow-datafusion/pull/3787#discussion_r1005377970


##########
datafusion/core/src/physical_plan/join_utils.rs:
##########
@@ -296,6 +299,154 @@ impl<T> Clone for OnceFut<T> {
     }
 }
 
+/// A shared state between statistic aggregators for a join
+/// operation.
+#[derive(Clone, Debug, Default)]
+struct PartialJoinStatistics {
+    pub num_rows: usize,
+    pub column_statistics: Vec<ColumnStatistics>,
+}
+
+/// Estimate the statistics for the given join's output.
+pub(crate) fn estimate_join_statistics(
+    left: Arc<dyn ExecutionPlan>,
+    right: Arc<dyn ExecutionPlan>,
+    on: JoinOn,
+    join_type: &JoinType,
+) -> Statistics {
+    let left_stats = left.statistics();
+    let right_stats = right.statistics();
+
+    let join_stats = estimate_join_cardinality(join_type, left_stats, 
right_stats, &on);
+    let (num_rows, column_statistics) = match join_stats {
+        Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
+        None => (None, None),
+    };
+    Statistics {
+        num_rows,
+        total_byte_size: None,
+        column_statistics,
+        is_exact: false,
+    }
+}
+
+// Estimate the cardinality for the given join with input statistics.
+fn estimate_join_cardinality(
+    join_type: &JoinType,
+    left_stats: Statistics,
+    right_stats: Statistics,
+    on: &JoinOn,
+) -> Option<PartialJoinStatistics> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let left_num_rows = left_stats.num_rows?;
+            let right_num_rows = right_stats.num_rows?;
+
+            // Take the left_col_stats and right_col_stats using the index
+            // obtained from index() method of the each element of 'on'.
+            let all_left_col_stats = left_stats.column_statistics?;
+            let all_right_col_stats = right_stats.column_statistics?;
+            let (left_col_stats, right_col_stats) = on
+                .iter()
+                .map(|(left, right)| {
+                    (
+                        all_left_col_stats[left.index()].clone(),
+                        all_right_col_stats[right.index()].clone(),
+                    )
+                })
+                .unzip::<_, _, Vec<_>, Vec<_>>();
+
+            let ij_cardinality = estimate_inner_join_cardinality(
+                left_num_rows,
+                right_num_rows,
+                left_col_stats,
+                right_col_stats,
+            )?;
+
+            // The cardinality for inner join can also be used to estimate
+            // the cardinality of left/right/full outer joins as long as it
+            // it is greater than the minimum cardinality constraints of these
+            // joins (so that we don't underestimate the cardinality).
+            let cardinality = match join_type {
+                JoinType::Inner => ij_cardinality,
+                JoinType::Left => max(ij_cardinality, left_num_rows),
+                JoinType::Right => max(ij_cardinality, right_num_rows),
+                JoinType::Full => {
+                    max(ij_cardinality, left_num_rows)
+                        + max(ij_cardinality, right_num_rows)
+                        - ij_cardinality
+                }
+                _ => unreachable!(),
+            };
+
+            Some(PartialJoinStatistics {
+                num_rows: cardinality,
+                // We don't do anything specific here, just combine the 
existing
+                // statistics which might yield subpar results (although it is
+                // true, esp regarding min/max). For a better estimation, we 
need
+                // filter selectivity analysis first.
+                column_statistics: all_left_col_stats
+                    .into_iter()
+                    .chain(all_right_col_stats.into_iter())
+                    .collect(),
+            })
+        }
+
+        JoinType::Semi => None,
+        JoinType::Anti => None,
+    }
+}
+
+/// Estimate the inner join cardinality by using the basic building blocks of
+/// column-level statistics and the total row count. This is a very naive and
+/// a very conservative implementation that can quickly give up if there is not
+/// enough input statistics.
+fn estimate_inner_join_cardinality(
+    left_num_rows: usize,
+    right_num_rows: usize,
+    left_col_stats: Vec<ColumnStatistics>,
+    right_col_stats: Vec<ColumnStatistics>,
+) -> Option<usize> {
+    // The algorithm here is partly based on the non-histogram selectivity 
estimation
+    // from Spark's Catalyst optimizer.
+
+    let mut join_selectivity = None;
+    for (left_stat, right_stat) in 
left_col_stats.iter().zip(right_col_stats.iter()) {
+        if (left_stat.min_value.clone()? > right_stat.max_value.clone()?)
+            || (left_stat.max_value.clone()? < right_stat.min_value.clone()?)
+        {
+            // If there is no overlap, then we can not accurately estimate
+            // the join cardinality. We could in theory use this information
+            // to point out the join will not produce any rows, but that would
+            // require some extra information (namely whether the statistics 
are
+            // exact). For now, we just give up.
+            return None;
+        }
+
+        let max_distinct = max(left_stat.distinct_count, 
right_stat.distinct_count);
+        if max_distinct > join_selectivity {
+            // Seems like there are a few implementations of this algorithm 
that implement
+            // exponential decay for the selectivity (like Hive's Optiq 
Optimizer). Needs
+            // further exploration.
+            join_selectivity = max_distinct;
+        }
+    }
+
+    // With the assumption that the smaller input's domain is generally 
represented in the bigger
+    // input's domain, we can estimate the inner join's cardinality by taking 
the cartesian product
+    // of the two inputs and normalizing it by the selectivity factor.

Review Comment:
   Since this is an inner join, that means only the intersection on the join 
columns is going to be present in the result. In the conditions above, we have 
already verified that the ranges intersect so the number of distinct values 
(with a uniform distribution) is the most reliable way to actually find out the 
number of matching rows.
   
   E.g. `a.col1` has `[1, 10000]` (all distinct) and `b.col1` has `[1, 1000]` 
(all distinct), for an inner join on `a.col1=b.col1`; we can estimate that the 
result cardinality as `10000 x 1000 / max(10000, 1000)` `=` `1000`. The same 
can be applied for distinct counts lower than the actual ranges (e.g. if 
`a.col1` had `1000` distinct values instead `10000` then there might be `10000` 
rows hence the normalization above ^)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to