This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4a91ce91e2 Minor: Improve `HashJoinExec` documentation (#7953)
4a91ce91e2 is described below
commit 4a91ce91e2e82ed33142405a00f619cc714a5a30
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 29 08:50:10 2023 -0400
Minor: Improve `HashJoinExec` documentation (#7953)
* Minor: Improve `HashJoinExec` documentation
* Apply suggestions from code review
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---------
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
datafusion/physical-plan/src/joins/hash_join.rs | 138 +++++++++++++++++++++---
1 file changed, 126 insertions(+), 12 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 9aa776fe05..dc0e81a6f3 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the join plan for executing partitions in parallel and then
joining the results
-//! into a set of partitions.
+//! [`HashJoinExec`] Partitioned Hash Join Operator
use std::fmt;
use std::mem::size_of;
@@ -78,29 +77,140 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
-/// Join execution plan executes partitions in parallel and combines them into
a set of
-/// partitions.
+/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
+/// partitions using a hash table and an optional filter list to apply post
+/// join.
///
-/// Filter expression expected to contain non-equality predicates that can not
be pushed
-/// down to any of join inputs.
-/// In case of outer join, filter applied to only matched rows.
+/// # Join Expressions
+///
+/// This implementation is optimized for evaluating eqijoin predicates (
+/// `<col1> = <col2>`) expressions, which are represented as a list of
`Columns`
+/// in [`Self::on`].
+///
+/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
+/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
+/// after the equijoin predicates.
+///
+/// # "Build Side" vs "Probe Side"
+///
+/// HashJoin takes two inputs, which are referred to as the "build" and the
+/// "probe". The build side is the first child, and the probe side is the
second
+/// child.
+///
+/// The two inputs are treated differently and it is VERY important that the
+/// *smaller* input is placed on the build side to minimize the work of
creating
+/// the hash table.
+///
+/// ```text
+/// ┌───────────┐
+/// │ HashJoin │
+/// │ │
+/// └───────────┘
+/// │ │
+/// ┌─────┘ └─────┐
+/// ▼ ▼
+/// ┌────────────┐ ┌─────────────┐
+/// │ Input │ │ Input │
+/// │ [0] │ │ [1] │
+/// └────────────┘ └─────────────┘
+///
+/// "build side" "probe side"
+/// ```
+///
+/// Execution proceeds in 2 stages:
+///
+/// 1. the **build phase** where a hash table is created from the tuples of the
+/// build side.
+///
+/// 2. the **probe phase** where the tuples of the probe side are streamed
+/// through, checking for matches of the join keys in the hash table.
+///
+/// ```text
+/// ┌────────────────┐ ┌────────────────┐
+/// │ ┌─────────┐ │ │ ┌─────────┐ │
+/// │ │ Hash │ │ │ │ Hash │ │
+/// │ │ Table │ │ │ │ Table │ │
+/// │ │(keys are│ │ │ │(keys are│ │
+/// │ │equi join│ │ │ │equi join│ │ Stage 2:
batches from
+/// Stage 1: the │ │columns) │ │ │ │columns) │ │ the
probe side are
+/// *entire* build │ │ │ │ │ │ │ │ streamed
through, and
+/// side is read │ └─────────┘ │ │ └─────────┘ │ checked
against the
+/// into the hash │ ▲ │ │ ▲ │ contents
of the hash
+/// table │ HashJoin │ │ HashJoin │
table
+/// └──────┼─────────┘ └──────────┼─────┘
+/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
+/// │ │
+///
+/// │ │
+/// ┌────────────┐
┌────────────┐
+/// │RecordBatch │ │RecordBatch
│
+/// └────────────┘
└────────────┘
+/// ┌────────────┐
┌────────────┐
+/// │RecordBatch │ │RecordBatch
│
+/// └────────────┘
└────────────┘
+/// ... ...
+/// ┌────────────┐
┌────────────┐
+/// │RecordBatch │ │RecordBatch
│
+/// └────────────┘
└────────────┘
+///
+/// build side probe side
+///
+/// ```
+///
+/// # Example "Optimal" Plans
+///
+/// The differences in the inputs means that for classic "Star Schema Query",
+/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
+/// one where there is one large table and several smaller "dimension" tables,
+/// joined on `Foreign Key = Primary Key` predicates.
+///
+/// A "Right Deep Tree" looks like this large table as the probe side on the
+/// lowest join:
+///
+/// ```text
+/// ┌───────────┐
+/// │ HashJoin │
+/// │ │
+/// └───────────┘
+/// │ │
+/// ┌───────┘ └──────────┐
+/// ▼ ▼
+/// ┌───────────────┐ ┌───────────┐
+/// │ small table 1 │ │ HashJoin │
+/// │ "dimension" │ │ │
+/// └───────────────┘ └───┬───┬───┘
+/// ┌──────────┘ └───────┐
+/// │ │
+/// ▼ ▼
+/// ┌───────────────┐ ┌───────────┐
+/// │ small table 2 │ │ HashJoin │
+/// │ "dimension" │ │ │
+/// └───────────────┘ └───┬───┬───┘
+/// ┌────────┘ └────────┐
+/// │ │
+/// ▼ ▼
+/// ┌───────────────┐ ┌───────────────┐
+/// │ small table 3 │ │ large table │
+/// │ "dimension" │ │ "fact" │
+/// └───────────────┘ └───────────────┘
+/// ```
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
pub left: Arc<dyn ExecutionPlan>,
/// right (probe) side which are filtered by the hash table
pub right: Arc<dyn ExecutionPlan>,
- /// Set of common columns used to join on
+ /// Set of equijoin columns from the relations: `(left_col, right_col)`
pub on: Vec<(Column, Column)>,
/// Filters which are applied while finding matching rows
pub filter: Option<JoinFilter>,
- /// How the join is performed
+ /// How the join is performed (`OUTER`, `INNER`, etc)
pub join_type: JoinType,
- /// The schema once the join is applied
+ /// The output schema for the join
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
- /// Shares the `RandomState` for the hashing algorithm
+ /// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Output order
output_order: Option<Vec<PhysicalSortExpr>>,
@@ -110,12 +220,16 @@ pub struct HashJoinExec {
metrics: ExecutionPlanMetricsSet,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
- /// If null_equals_null is true, null == null else null != null
+ /// Null matching behavior: If `null_equals_null` is true, rows that have
+ /// `null`s in both left and right equijoin columns will be matched.
+ /// Otherwise, rows that have `null`s in the join columns will not be
+ /// matched and thus will not appear in the output.
pub null_equals_null: bool,
}
impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
+ ///
/// # Error
/// This function errors when it is not possible to join the left and
right sides on keys `on`.
pub fn try_new(