This is an automated email from the ASF dual-hosted git repository.
korowa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new eeb9d58a69 Minor: Improve documentation about `OnceAsync` (#13223)
eeb9d58a69 is described below
commit eeb9d58a69f98ef931e23d292f82b44c1c6907e1
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Nov 5 00:05:46 2024 -0500
Minor: Improve documentation about `OnceAsync` (#13223)
* Minor: add documentation about OnceAsync
* More refinement
* Fix docs CI
* Update datafusion/physical-plan/src/joins/hash_join.rs
Co-authored-by: Eduard Karacharov <[email protected]>
---------
Co-authored-by: Eduard Karacharov <[email protected]>
---
datafusion/physical-plan/src/joins/cross_join.rs | 28 ++++++++++++++++------
datafusion/physical-plan/src/joins/hash_join.rs | 13 +++++++---
.../physical-plan/src/joins/nested_loop_join.rs | 19 +++++++++------
datafusion/physical-plan/src/joins/utils.rs | 16 +++++++++----
4 files changed, 55 insertions(+), 21 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8c8921eba6..7f785006f7 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -46,7 +46,7 @@ use
datafusion_physical_expr::equivalence::join_equivalence_properties;
use async_trait::async_trait;
use futures::{ready, Stream, StreamExt, TryStreamExt};
-/// Data of the left side
+/// Data of the left side that is buffered into memory
#[derive(Debug)]
struct JoinLeftData {
/// Single RecordBatch with all rows from the left side
@@ -58,12 +58,20 @@ struct JoinLeftData {
}
#[allow(rustdoc::private_intra_doc_links)]
-/// executes partitions in parallel and combines them into a set of
-/// partitions by combining all values from the left with all values on the
right
+/// Cross Join Execution Plan
///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// This operator is used when there are no predicates between two tables and
+/// returns the Cartesian product of the two tables.
+///
+/// Buffers the left input into memory and then streams batches from each
+/// partition on the right input combining them with the buffered left input
+/// to generate the output.
+///
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct CrossJoinExec {
/// left (build) side which gets loaded in memory
@@ -72,10 +80,16 @@ pub struct CrossJoinExec {
pub right: Arc<dyn ExecutionPlan>,
/// The schema once the join is applied
schema: SchemaRef,
- /// Build-side data
+ /// Buffered copy of left (build) side in memory.
+ ///
+ /// This structure is *shared* across all output streams.
+ ///
+ /// Each output stream waits on the `OnceAsync` to signal the completion of
+ /// the left side loading.
left_fut: OnceAsync<JoinLeftData>,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
+ /// Properties such as schema, equivalence properties, ordering,
partitioning, etc.
cache: PlanProperties,
}
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index ae872e13a9..32267b1181 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -295,9 +295,11 @@ impl JoinLeftData {
/// └───────────────┘ └───────────────┘
/// ```
///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
@@ -314,6 +316,11 @@ pub struct HashJoinExec {
/// if there is a projection, the schema isn't the same as the output
schema.
join_schema: SchemaRef,
/// Future that consumes left input and builds the hash table
+ ///
+ /// For CollectLeft partition mode, this structure is *shared* across all
output streams.
+ ///
+ /// Each output stream waits on the `OnceAsync` to signal the completion of
+ /// the hash table creation.
left_fut: OnceAsync<JoinLeftData>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index f36c2395e2..71c617a963 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the nested loop join plan, it supports all [`JoinType`].
-//! The nested loop join can execute in parallel by partitions and it is
-//! determined by the [`JoinType`].
+//! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates).
use std::any::Any;
use std::fmt::Formatter;
@@ -141,9 +139,11 @@ impl JoinLeftData {
/// "reports" about probe phase completion (which means that "visited" bitmap
won't be
/// updated anymore), and only the last thread, reporting about completion,
will return output.
///
-/// Note that the `Clone` trait is not implemented for this struct due to the
-/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
-/// left side with the processing in each output stream.
+/// # Clone / Shared State
+///
+/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
+/// loading of the left side with the processing in each output stream.
+/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct NestedLoopJoinExec {
/// left side
@@ -156,7 +156,12 @@ pub struct NestedLoopJoinExec {
pub(crate) join_type: JoinType,
/// The schema once the join is applied
schema: SchemaRef,
- /// Build-side data
+ /// Future that consumes left input and buffers it in memory
+ ///
+ /// This structure is *shared* across all output streams.
+ ///
+ /// Each output stream waits on the `OnceAsync` to signal the completion of
+ /// the hash table creation.
inner_table: OnceAsync<JoinLeftData>,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index d3fa37c2ac..a257119a8b 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -700,11 +700,19 @@ pub fn build_join_schema(
(fields.finish().with_metadata(metadata), column_indices)
}
-/// A [`OnceAsync`] can be used to run an async closure once, with subsequent
calls
-/// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous
computation
+/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
+/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
+/// same computation.
///
-/// This is useful for joins where the results of one child are buffered in
memory
-/// and shared across potentially multiple output partitions
+/// This is useful for joins where the results of one child are needed to
proceed
+/// with multiple output stream
+///
+///
+/// For example, in a hash join, one input is buffered and shared across
+/// potentially multiple output partitions. Each output partition must wait for
+/// the hash table to be built before proceeding.
+///
+/// Each output partition waits on the same `OnceAsync` before proceeding.
pub(crate) struct OnceAsync<T> {
fut: Mutex<Option<OnceFut<T>>>,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]