This is an automated email from the ASF dual-hosted git repository.

korowa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new eeb9d58a69 Minor: Improve documentation about `OnceAsync` (#13223)
eeb9d58a69 is described below

commit eeb9d58a69f98ef931e23d292f82b44c1c6907e1
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Nov 5 00:05:46 2024 -0500

    Minor: Improve documentation about `OnceAsync` (#13223)
    
    * Minor: add documentation about OnceAsync
    
    * More refinement
    
    * Fix docs CI
    
    * Update datafusion/physical-plan/src/joins/hash_join.rs
    
    Co-authored-by: Eduard Karacharov <[email protected]>
    
    ---------
    
    Co-authored-by: Eduard Karacharov <[email protected]>
---
 datafusion/physical-plan/src/joins/cross_join.rs   | 28 ++++++++++++++++------
 datafusion/physical-plan/src/joins/hash_join.rs    | 13 +++++++---
 .../physical-plan/src/joins/nested_loop_join.rs    | 19 +++++++++------
 datafusion/physical-plan/src/joins/utils.rs        | 16 +++++++++----
 4 files changed, 55 insertions(+), 21 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8c8921eba6..7f785006f7 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -46,7 +46,7 @@ use 
datafusion_physical_expr::equivalence::join_equivalence_properties;
 use async_trait::async_trait;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
-/// Data of the left side
+/// Data of the left side that is buffered into memory
 #[derive(Debug)]
 struct JoinLeftData {
     /// Single RecordBatch with all rows from the left side
@@ -58,12 +58,20 @@ struct JoinLeftData {
 }
 
 #[allow(rustdoc::private_intra_doc_links)]
-/// executes partitions in parallel and combines them into a set of
-/// partitions by combining all values from the left with all values on the 
right
+/// Cross Join Execution Plan
 ///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// This operator is used when there are no predicates between two tables and
+/// returns the Cartesian product of the two tables.
+///
+/// Buffers the left input into memory and then streams batches from each
+/// partition on the right input combining them with the buffered left input
+/// to generate the output.
+///
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
 #[derive(Debug)]
 pub struct CrossJoinExec {
     /// left (build) side which gets loaded in memory
@@ -72,10 +80,16 @@ pub struct CrossJoinExec {
     pub right: Arc<dyn ExecutionPlan>,
     /// The schema once the join is applied
     schema: SchemaRef,
-    /// Build-side data
+    /// Buffered copy of left (build) side in memory.
+    ///
+    /// This structure is *shared* across all output streams.
+    ///
+    /// Each output stream waits on the `OnceAsync` to signal the completion of
+    /// the left side loading.
     left_fut: OnceAsync<JoinLeftData>,
     /// Execution plan metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Properties such as schema, equivalence properties, ordering, 
partitioning, etc.
     cache: PlanProperties,
 }
 
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index ae872e13a9..32267b1181 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -295,9 +295,11 @@ impl JoinLeftData {
 ///                       └───────────────┘     └───────────────┘
 /// ```
 ///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
 #[derive(Debug)]
 pub struct HashJoinExec {
     /// left (build) side which gets hashed
@@ -314,6 +316,11 @@ pub struct HashJoinExec {
     /// if there is a projection, the schema isn't the same as the output 
schema.
     join_schema: SchemaRef,
     /// Future that consumes left input and builds the hash table
+    ///
+    /// For CollectLeft partition mode, this structure is *shared* across all 
output streams.
+    ///
+    /// Each output stream waits on the `OnceAsync` to signal the completion of
+    /// the hash table creation.
     left_fut: OnceAsync<JoinLeftData>,
     /// Shared the `RandomState` for the hashing algorithm
     random_state: RandomState,
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index f36c2395e2..71c617a963 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the nested loop join plan, it supports all [`JoinType`].
-//! The nested loop join can execute in parallel by partitions and it is
-//! determined by the [`JoinType`].
+//! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates).
 
 use std::any::Any;
 use std::fmt::Formatter;
@@ -141,9 +139,11 @@ impl JoinLeftData {
 /// "reports" about probe phase completion (which means that "visited" bitmap 
won't be
 /// updated anymore), and only the last thread, reporting about completion, 
will return output.
 ///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
 #[derive(Debug)]
 pub struct NestedLoopJoinExec {
     /// left side
@@ -156,7 +156,12 @@ pub struct NestedLoopJoinExec {
     pub(crate) join_type: JoinType,
     /// The schema once the join is applied
     schema: SchemaRef,
-    /// Build-side data
+    /// Future that consumes left input and buffers it in memory
+    ///
+    /// This structure is *shared* across all output streams.
+    ///
+    /// Each output stream waits on the `OnceAsync` to signal the completion of
+    /// the hash table creation.
     inner_table: OnceAsync<JoinLeftData>,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index d3fa37c2ac..a257119a8b 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -700,11 +700,19 @@ pub fn build_join_schema(
     (fields.finish().with_metadata(metadata), column_indices)
 }
 
-/// A [`OnceAsync`] can be used to run an async closure once, with subsequent 
calls
-/// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous 
computation
+/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
+/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
+/// same computation.
 ///
-/// This is useful for joins where the results of one child are buffered in 
memory
-/// and shared across potentially multiple output partitions
+/// This is useful for joins where the results of one child are needed to 
proceed
+/// with multiple output stream
+///
+///
+/// For example, in a hash join, one input is buffered and shared across
+/// potentially multiple output partitions. Each output partition must wait for
+/// the hash table to be built before proceeding.
+///
+/// Each output partition waits on the same `OnceAsync` before proceeding.
 pub(crate) struct OnceAsync<T> {
     fut: Mutex<Option<OnceFut<T>>>,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to