This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a91ce91e2 Minor: Improve `HashJoinExec` documentation (#7953)
4a91ce91e2 is described below

commit 4a91ce91e2e82ed33142405a00f619cc714a5a30
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 29 08:50:10 2023 -0400

    Minor: Improve `HashJoinExec` documentation (#7953)
    
    * Minor: Improve `HashJoinExec` documentation
    
    * Apply suggestions from code review
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 datafusion/physical-plan/src/joins/hash_join.rs | 138 +++++++++++++++++++++---
 1 file changed, 126 insertions(+), 12 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 9aa776fe05..dc0e81a6f3 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the join plan for executing partitions in parallel and then 
joining the results
-//! into a set of partitions.
+//! [`HashJoinExec`] Partitioned Hash Join Operator
 
 use std::fmt;
 use std::mem::size_of;
@@ -78,29 +77,140 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
 
 type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
 
-/// Join execution plan executes partitions in parallel and combines them into 
a set of
-/// partitions.
+/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
+/// partitions using a hash table and an optional filter list to apply post
+/// join.
 ///
-/// Filter expression expected to contain non-equality predicates that can not 
be pushed
-/// down to any of join inputs.
-/// In case of outer join, filter applied to only matched rows.
+/// # Join Expressions
+///
+/// This implementation is optimized for evaluating eqijoin predicates  (
+/// `<col1> = <col2>`) expressions, which are represented as a list of 
`Columns`
+/// in [`Self::on`].
+///
+/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
+/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
+/// after the equijoin predicates.
+///
+/// # "Build Side" vs "Probe Side"
+///
+/// HashJoin takes two inputs, which are referred to as the "build" and the
+/// "probe". The build side is the first child, and the probe side is the 
second
+/// child.
+///
+/// The two inputs are treated differently and it is VERY important that the
+/// *smaller* input is placed on the build side to minimize the work of 
creating
+/// the hash table.
+///
+/// ```text
+///          ┌───────────┐
+///          │ HashJoin  │
+///          │           │
+///          └───────────┘
+///              │   │
+///        ┌─────┘   └─────┐
+///        ▼               ▼
+/// ┌────────────┐  ┌─────────────┐
+/// │   Input    │  │    Input    │
+/// │    [0]     │  │     [1]     │
+/// └────────────┘  └─────────────┘
+///
+///  "build side"    "probe side"
+/// ```
+///
+/// Execution proceeds in 2 stages:
+///
+/// 1. the **build phase** where a hash table is created from the tuples of the
+/// build side.
+///
+/// 2. the **probe phase** where the tuples of the probe side are streamed
+/// through, checking for matches of the join keys in the hash table.
+///
+/// ```text
+///                 ┌────────────────┐          ┌────────────────┐
+///                 │ ┌─────────┐    │          │ ┌─────────┐    │
+///                 │ │  Hash   │    │          │ │  Hash   │    │
+///                 │ │  Table  │    │          │ │  Table  │    │
+///                 │ │(keys are│    │          │ │(keys are│    │
+///                 │ │equi join│    │          │ │equi join│    │  Stage 2: 
batches from
+///  Stage 1: the   │ │columns) │    │          │ │columns) │    │    the 
probe side are
+/// *entire* build  │ │         │    │          │ │         │    │  streamed 
through, and
+///  side is read   │ └─────────┘    │          │ └─────────┘    │   checked 
against the
+/// into the hash   │      ▲         │          │          ▲     │   contents 
of the hash
+///     table       │       HashJoin │          │  HashJoin      │          
table
+///                 └──────┼─────────┘          └──────────┼─────┘
+///             ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
+///            │                                                         │
+///
+///            │                                                         │
+///     ┌────────────┐                                            
┌────────────┐
+///     │RecordBatch │                                            │RecordBatch 
│
+///     └────────────┘                                            
└────────────┘
+///     ┌────────────┐                                            
┌────────────┐
+///     │RecordBatch │                                            │RecordBatch 
│
+///     └────────────┘                                            
└────────────┘
+///           ...                                                       ...
+///     ┌────────────┐                                            
┌────────────┐
+///     │RecordBatch │                                            │RecordBatch 
│
+///     └────────────┘                                            
└────────────┘
+///
+///        build side                                                probe side
+///
+/// ```
+///
+/// # Example "Optimal" Plans
+///
+/// The differences in the inputs means that for classic "Star Schema Query",
+/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
+/// one where there is one large table and several smaller "dimension" tables,
+/// joined on `Foreign Key = Primary Key` predicates.
+///
+/// A "Right Deep Tree" looks like this large table as the probe side on the
+/// lowest join:
+///
+/// ```text
+///             ┌───────────┐
+///             │ HashJoin  │
+///             │           │
+///             └───────────┘
+///                 │   │
+///         ┌───────┘   └──────────┐
+///         ▼                      ▼
+/// ┌───────────────┐        ┌───────────┐
+/// │ small table 1 │        │ HashJoin  │
+/// │  "dimension"  │        │           │
+/// └───────────────┘        └───┬───┬───┘
+///                   ┌──────────┘   └───────┐
+///                   │                      │
+///                   ▼                      ▼
+///           ┌───────────────┐        ┌───────────┐
+///           │ small table 2 │        │ HashJoin  │
+///           │  "dimension"  │        │           │
+///           └───────────────┘        └───┬───┬───┘
+///                               ┌────────┘   └────────┐
+///                               │                     │
+///                               ▼                     ▼
+///                       ┌───────────────┐     ┌───────────────┐
+///                       │ small table 3 │     │  large table  │
+///                       │  "dimension"  │     │    "fact"     │
+///                       └───────────────┘     └───────────────┘
+/// ```
 #[derive(Debug)]
 pub struct HashJoinExec {
     /// left (build) side which gets hashed
     pub left: Arc<dyn ExecutionPlan>,
     /// right (probe) side which are filtered by the hash table
     pub right: Arc<dyn ExecutionPlan>,
-    /// Set of common columns used to join on
+    /// Set of equijoin columns from the relations: `(left_col, right_col)`
     pub on: Vec<(Column, Column)>,
     /// Filters which are applied while finding matching rows
     pub filter: Option<JoinFilter>,
-    /// How the join is performed
+    /// How the join is performed (`OUTER`, `INNER`, etc)
     pub join_type: JoinType,
-    /// The schema once the join is applied
+    /// The output schema for the join
     schema: SchemaRef,
     /// Build-side data
     left_fut: OnceAsync<JoinLeftData>,
-    /// Shares the `RandomState` for the hashing algorithm
+    /// Shared the `RandomState` for the hashing algorithm
     random_state: RandomState,
     /// Output order
     output_order: Option<Vec<PhysicalSortExpr>>,
@@ -110,12 +220,16 @@ pub struct HashJoinExec {
     metrics: ExecutionPlanMetricsSet,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
-    /// If null_equals_null is true, null == null else null != null
+    /// Null matching behavior: If `null_equals_null` is true, rows that have
+    /// `null`s in both left and right equijoin columns will be matched.
+    /// Otherwise, rows that have `null`s in the join columns will not be
+    /// matched and thus will not appear in the output.
     pub null_equals_null: bool,
 }
 
 impl HashJoinExec {
     /// Tries to create a new [HashJoinExec].
+    ///
     /// # Error
     /// This function errors when it is not possible to join the left and 
right sides on keys `on`.
     pub fn try_new(

Reply via email to