This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 79481839da Remove Eager Trait for Joins (#10721)
79481839da is described below
commit 79481839da993623e899f8835a145ddd8bfc210e
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu May 30 23:41:43 2024 +0300
Remove Eager Trait for Joins (#10721)
* Remove eager trait
* Update helpers.rs
---
.../physical-plan/src/joins/stream_join_utils.rs | 354 +--------------------
.../physical-plan/src/joins/symmetric_hash_join.rs | 268 +++++++++++++++-
datafusion/physical-plan/src/joins/utils.rs | 28 +-
3 files changed, 263 insertions(+), 387 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index f82eb31f96..0a01d84141 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -20,12 +20,11 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
-use std::task::{Context, Poll};
use std::usize;
-use crate::joins::utils::{JoinFilter, JoinHashMapType, StatefulStreamResult};
+use crate::joins::utils::{JoinFilter, JoinHashMapType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
-use crate::{handle_async_state, handle_state, metrics, ExecutionPlan};
+use crate::{metrics, ExecutionPlan};
use arrow::compute::concat_batches;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray,
RecordBatch};
@@ -36,15 +35,12 @@ use datafusion_common::{
arrow_datafusion_err, plan_datafusion_err, DataFusionError, JoinSide,
Result,
ScalarValue,
};
-use datafusion_execution::SendableRecordBatchStream;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use async_trait::async_trait;
-use futures::{ready, FutureExt, StreamExt};
use hashbrown::raw::RawTable;
use hashbrown::HashSet;
@@ -629,352 +625,6 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
-/// Represents the various states of an eager join stream operation.
-///
-/// This enum is used to track the current state of streaming during a join
-/// operation. It provides indicators as to which side of the join needs to be
-/// pulled next or if one (or both) sides have been exhausted. This allows
-/// for efficient management of resources and optimal performance during the
-/// join process.
-#[derive(Clone, Debug)]
-pub enum EagerJoinStreamState {
- /// Indicates that the next step should pull from the right side of the
join.
- PullRight,
-
- /// Indicates that the next step should pull from the left side of the
join.
- PullLeft,
-
- /// State representing that the right side of the join has been fully
processed.
- RightExhausted,
-
- /// State representing that the left side of the join has been fully
processed.
- LeftExhausted,
-
- /// Represents a state where both sides of the join are exhausted.
- ///
- /// The `final_result` field indicates whether the join operation has
- /// produced a final result or not.
- BothExhausted { final_result: bool },
-}
-
-/// `EagerJoinStream` is an asynchronous trait designed for managing
incremental
-/// join operations between two streams, such as those used in
`SymmetricHashJoinExec`
-/// and `SortMergeJoinExec`. Unlike traditional join approaches that need to
scan
-/// one side of the join fully before proceeding, `EagerJoinStream` facilitates
-/// more dynamic join operations by working with streams as they emit data.
This
-/// approach allows for more efficient processing, particularly in scenarios
-/// where waiting for complete data materialization is not feasible or optimal.
-/// The trait provides a framework for handling various states of such a join
-/// process, ensuring that join logic is efficiently executed as data becomes
-/// available from either stream.
-///
-/// Implementors of this trait can perform eager joins of data from two
different
-/// asynchronous streams, typically referred to as left and right streams. The
-/// trait provides a comprehensive set of methods to control and execute the
join
-/// process, leveraging the states defined in `EagerJoinStreamState`. Methods
are
-/// primarily focused on asynchronously fetching data batches from each stream,
-/// processing them, and managing transitions between various states of the
join.
-///
-/// This trait's default implementations use a state machine approach to
navigate
-/// different stages of the join operation, handling data from both streams and
-/// determining when the join completes.
-///
-/// State Transitions:
-/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
-/// - In `fetch_next_from_left_stream`, when fetching a batch from the left
stream:
-/// - On success (`Some(Ok(batch))`), state transitions to `PullRight` for
-/// processing the batch.
-/// - On error (`Some(Err(e))`), the error is returned, and the state
remains
-/// unchanged.
-/// - On no data (`None`), state changes to `LeftExhausted`, returning
`Continue`
-/// to proceed with the join process.
-/// - From `PullRight` to `PullLeft` or `RightExhausted`:
-/// - In `fetch_next_from_right_stream`, when fetching from the right stream:
-/// - If a batch is available, state changes to `PullLeft` for processing.
-/// - On error, the error is returned without changing the state.
-/// - If right stream is exhausted (`None`), state transitions to
`RightExhausted`,
-/// with a `Continue` result.
-/// - Handling `RightExhausted` and `LeftExhausted`:
-/// - Methods `handle_right_stream_end` and `handle_left_stream_end` manage
scenarios
-/// when streams are exhausted:
-/// - They attempt to continue processing with the other stream.
-/// - If both streams are exhausted, state changes to `BothExhausted {
final_result: false }`.
-/// - Transition to `BothExhausted { final_result: true }`:
-/// - Occurs in `prepare_for_final_results_after_exhaustion` when both
streams are
-/// exhausted, indicating completion of processing and availability of
final results.
-#[async_trait]
-pub trait EagerJoinStream {
- /// Implements the main polling logic for the join stream.
- ///
- /// This method continuously checks the state of the join stream and
- /// acts accordingly by delegating the handling to appropriate sub-methods
- /// depending on the current state.
- ///
- /// # Arguments
- ///
- /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
- ///
- /// # Returns
- ///
- /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
- fn poll_next_impl(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Result<RecordBatch>>>
- where
- Self: Send,
- {
- loop {
- return match self.state() {
- EagerJoinStreamState::PullRight => {
- handle_async_state!(self.fetch_next_from_right_stream(),
cx)
- }
- EagerJoinStreamState::PullLeft => {
- handle_async_state!(self.fetch_next_from_left_stream(), cx)
- }
- EagerJoinStreamState::RightExhausted => {
- handle_async_state!(self.handle_right_stream_end(), cx)
- }
- EagerJoinStreamState::LeftExhausted => {
- handle_async_state!(self.handle_left_stream_end(), cx)
- }
- EagerJoinStreamState::BothExhausted {
- final_result: false,
- } => {
-
handle_state!(self.prepare_for_final_results_after_exhaustion())
- }
- EagerJoinStreamState::BothExhausted { final_result: true } => {
- Poll::Ready(None)
- }
- };
- }
- }
- /// Asynchronously pulls the next batch from the right stream.
- ///
- /// This default implementation checks for the next value in the right
stream.
- /// If a batch is found, the state is switched to `PullLeft`, and the
batch handling
- /// is delegated to `process_batch_from_right`. If the stream ends, the
state is set to `RightExhausted`.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
- async fn fetch_next_from_right_stream(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- match self.right_stream().next().await {
- Some(Ok(batch)) => {
- if batch.num_rows() == 0 {
- return Ok(StatefulStreamResult::Continue);
- }
- self.set_state(EagerJoinStreamState::PullLeft);
- self.process_batch_from_right(batch)
- }
- Some(Err(e)) => Err(e),
- None => {
- self.set_state(EagerJoinStreamState::RightExhausted);
- Ok(StatefulStreamResult::Continue)
- }
- }
- }
-
- /// Asynchronously pulls the next batch from the left stream.
- ///
- /// This default implementation checks for the next value in the left
stream.
- /// If a batch is found, the state is switched to `PullRight`, and the
batch handling
- /// is delegated to `process_batch_from_left`. If the stream ends, the
state is set to `LeftExhausted`.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
- async fn fetch_next_from_left_stream(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- match self.left_stream().next().await {
- Some(Ok(batch)) => {
- if batch.num_rows() == 0 {
- return Ok(StatefulStreamResult::Continue);
- }
- self.set_state(EagerJoinStreamState::PullRight);
- self.process_batch_from_left(batch)
- }
- Some(Err(e)) => Err(e),
- None => {
- self.set_state(EagerJoinStreamState::LeftExhausted);
- Ok(StatefulStreamResult::Continue)
- }
- }
- }
-
- /// Asynchronously handles the scenario when the right stream is exhausted.
- ///
- /// In this default implementation, when the right stream is exhausted, it
attempts
- /// to pull from the left stream. If a batch is found in the left stream,
it delegates
- /// the handling to `process_batch_from_left`. If both streams are
exhausted, the state is set
- /// to indicate both streams are exhausted without final results yet.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after checking the exhaustion state.
- async fn handle_right_stream_end(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- match self.left_stream().next().await {
- Some(Ok(batch)) => {
- if batch.num_rows() == 0 {
- return Ok(StatefulStreamResult::Continue);
- }
- self.process_batch_after_right_end(batch)
- }
- Some(Err(e)) => Err(e),
- None => {
- self.set_state(EagerJoinStreamState::BothExhausted {
- final_result: false,
- });
- Ok(StatefulStreamResult::Continue)
- }
- }
- }
-
- /// Asynchronously handles the scenario when the left stream is exhausted.
- ///
- /// When the left stream is exhausted, this default
- /// implementation tries to pull from the right stream and delegates the
batch
- /// handling to `process_batch_after_left_end`. If both streams are
exhausted, the state
- /// is updated to indicate so.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after checking the exhaustion state.
- async fn handle_left_stream_end(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- match self.right_stream().next().await {
- Some(Ok(batch)) => {
- if batch.num_rows() == 0 {
- return Ok(StatefulStreamResult::Continue);
- }
- self.process_batch_after_left_end(batch)
- }
- Some(Err(e)) => Err(e),
- None => {
- self.set_state(EagerJoinStreamState::BothExhausted {
- final_result: false,
- });
- Ok(StatefulStreamResult::Continue)
- }
- }
- }
-
- /// Handles the state when both streams are exhausted and final results
are yet to be produced.
- ///
- /// This default implementation switches the state to indicate both
streams are
- /// exhausted with final results and then invokes the handling for this
specific
- /// scenario via `process_batches_before_finalization`.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after both streams are exhausted.
- fn prepare_for_final_results_after_exhaustion(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- self.set_state(EagerJoinStreamState::BothExhausted { final_result:
true });
- self.process_batches_before_finalization()
- }
-
- /// Handles a pulled batch from the right stream.
- ///
- /// # Arguments
- ///
- /// * `batch` - The pulled `RecordBatch` from the right stream.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after processing the batch.
- fn process_batch_from_right(
- &mut self,
- batch: RecordBatch,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
- /// Handles a pulled batch from the left stream.
- ///
- /// # Arguments
- ///
- /// * `batch` - The pulled `RecordBatch` from the left stream.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after processing the batch.
- fn process_batch_from_left(
- &mut self,
- batch: RecordBatch,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
- /// Handles the situation when only the left stream is exhausted.
- ///
- /// # Arguments
- ///
- /// * `right_batch` - The `RecordBatch` from the right stream.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after the left stream is exhausted.
- fn process_batch_after_left_end(
- &mut self,
- right_batch: RecordBatch,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
- /// Handles the situation when only the right stream is exhausted.
- ///
- /// # Arguments
- ///
- /// * `left_batch` - The `RecordBatch` from the left stream.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after the right stream is exhausted.
- fn process_batch_after_right_end(
- &mut self,
- left_batch: RecordBatch,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
- /// Handles the final state after both streams are exhausted.
- ///
- /// # Returns
- ///
- /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The final
state result after processing.
- fn process_batches_before_finalization(
- &mut self,
- ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
- /// Provides mutable access to the right stream.
- ///
- /// # Returns
- ///
- /// * `&mut SendableRecordBatchStream` - Returns a mutable reference to
the right stream.
- fn right_stream(&mut self) -> &mut SendableRecordBatchStream;
-
- /// Provides mutable access to the left stream.
- ///
- /// # Returns
- ///
- /// * `&mut SendableRecordBatchStream` - Returns a mutable reference to
the left stream.
- fn left_stream(&mut self) -> &mut SendableRecordBatchStream;
-
- /// Sets the current state of the join stream.
- ///
- /// # Arguments
- ///
- /// * `state` - The new state to be set.
- fn set_state(&mut self, state: EagerJoinStreamState);
-
- /// Fetches the current state of the join stream.
- ///
- /// # Returns
- ///
- /// * `EagerJoinStreamState` - The current state of the join stream.
- fn state(&mut self) -> EagerJoinStreamState;
-}
-
#[derive(Debug)]
pub struct StreamJoinSideMetrics {
/// Number of batches consumed by this operator
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 0d902af9c6..449c42d697 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -28,17 +28,17 @@
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;
-use std::task::Poll;
+use std::task::{Context, Poll};
use std::{usize, vec};
use crate::common::SharedMemoryReservation;
+use crate::handle_state;
use crate::joins::hash_join::{equal_rows_arr, update_hash};
use crate::joins::stream_join_utils::{
calculate_filter_expr_intervals, combine_two_batches,
convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
- EagerJoinStream, EagerJoinStreamState, PruningJoinHashMap,
SortedFilterExpr,
- StreamJoinMetrics,
+ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
};
use crate::joins::utils::{
apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
@@ -72,7 +72,7 @@ use
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use ahash::RandomState;
-use futures::Stream;
+use futures::{ready, Stream, StreamExt};
use hashbrown::HashSet;
use parking_lot::Mutex;
@@ -522,7 +522,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
left_sorted_filter_expr,
right_sorted_filter_expr,
null_equals_null: self.null_equals_null,
- state: EagerJoinStreamState::PullRight,
+ state: SHJStreamState::PullRight,
reservation,
}))
}
@@ -560,7 +560,7 @@ struct SymmetricHashJoinStream {
/// Memory reservation
reservation: SharedMemoryReservation,
/// State machine for input execution
- state: EagerJoinStreamState,
+ state: SHJStreamState,
}
impl RecordBatchStream for SymmetricHashJoinStream {
@@ -1103,7 +1103,227 @@ impl OneSideHashJoiner {
}
}
-impl EagerJoinStream for SymmetricHashJoinStream {
+/// `SymmetricHashJoinStream` manages incremental join operations between two
+/// streams. Unlike traditional join approaches that need to scan one side of
+/// the join fully before proceeding, `SymmetricHashJoinStream` facilitates
+/// more dynamic join operations by working with streams as they emit data.
This
+/// approach allows for more efficient processing, particularly in scenarios
+/// where waiting for complete data materialization is not feasible or optimal.
+/// The trait provides a framework for handling various states of such a join
+/// process, ensuring that join logic is efficiently executed as data becomes
+/// available from either stream.
+///
+/// This implementation performs eager joins of data from two different
asynchronous
+/// streams, typically referred to as left and right streams. The
implementation
+/// provides a comprehensive set of methods to control and execute the join
+/// process, leveraging the states defined in `SHJStreamState`. Methods are
+/// primarily focused on asynchronously fetching data batches from each stream,
+/// processing them, and managing transitions between various states of the
join.
+///
+/// This implementations use a state machine approach to navigate different
+/// stages of the join operation, handling data from both streams and
determining
+/// when the join completes.
+///
+/// State Transitions:
+/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
+/// - In `fetch_next_from_left_stream`, when fetching a batch from the left
stream:
+/// - On success (`Some(Ok(batch))`), state transitions to `PullRight` for
+/// processing the batch.
+/// - On error (`Some(Err(e))`), the error is returned, and the state
remains
+/// unchanged.
+/// - On no data (`None`), state changes to `LeftExhausted`, returning
`Continue`
+/// to proceed with the join process.
+/// - From `PullRight` to `PullLeft` or `RightExhausted`:
+/// - In `fetch_next_from_right_stream`, when fetching from the right stream:
+/// - If a batch is available, state changes to `PullLeft` for processing.
+/// - On error, the error is returned without changing the state.
+/// - If right stream is exhausted (`None`), state transitions to
`RightExhausted`,
+/// with a `Continue` result.
+/// - Handling `RightExhausted` and `LeftExhausted`:
+/// - Methods `handle_right_stream_end` and `handle_left_stream_end` manage
scenarios
+/// when streams are exhausted:
+/// - They attempt to continue processing with the other stream.
+/// - If both streams are exhausted, state changes to `BothExhausted {
final_result: false }`.
+/// - Transition to `BothExhausted { final_result: true }`:
+/// - Occurs in `prepare_for_final_results_after_exhaustion` when both
streams are
+/// exhausted, indicating completion of processing and availability of
final results.
+impl SymmetricHashJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ loop {
+ return match self.state() {
+ SHJStreamState::PullRight => {
+
handle_state!(ready!(self.fetch_next_from_right_stream(cx)))
+ }
+ SHJStreamState::PullLeft => {
+ handle_state!(ready!(self.fetch_next_from_left_stream(cx)))
+ }
+ SHJStreamState::RightExhausted => {
+ handle_state!(ready!(self.handle_right_stream_end(cx)))
+ }
+ SHJStreamState::LeftExhausted => {
+ handle_state!(ready!(self.handle_left_stream_end(cx)))
+ }
+ SHJStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ SHJStreamState::BothExhausted { final_result: true } =>
Poll::Ready(None),
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right stream.
+ ///
+ /// This default implementation checks for the next value in the right
stream.
+ /// If a batch is found, the state is switched to `PullLeft`, and the
batch handling
+ /// is delegated to `process_batch_from_right`. If the stream ends, the
state is set to `RightExhausted`.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
+ fn fetch_next_from_right_stream(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+ match ready!(self.right_stream().poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Poll::Ready(Ok(StatefulStreamResult::Continue));
+ }
+ self.set_state(SHJStreamState::PullLeft);
+ Poll::Ready(self.process_batch_from_right(batch))
+ }
+ Some(Err(e)) => Poll::Ready(Err(e)),
+ None => {
+ self.set_state(SHJStreamState::RightExhausted);
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ }
+ }
+ }
+
+ /// Asynchronously pulls the next batch from the left stream.
+ ///
+ /// This default implementation checks for the next value in the left
stream.
+ /// If a batch is found, the state is switched to `PullRight`, and the
batch handling
+ /// is delegated to `process_batch_from_left`. If the stream ends, the
state is set to `LeftExhausted`.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
+ fn fetch_next_from_left_stream(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+ match ready!(self.left_stream().poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Poll::Ready(Ok(StatefulStreamResult::Continue));
+ }
+ self.set_state(SHJStreamState::PullRight);
+ Poll::Ready(self.process_batch_from_left(batch))
+ }
+ Some(Err(e)) => Poll::Ready(Err(e)),
+ None => {
+ self.set_state(SHJStreamState::LeftExhausted);
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ }
+ }
+ }
+
+ /// Asynchronously handles the scenario when the right stream is exhausted.
+ ///
+ /// In this default implementation, when the right stream is exhausted, it
attempts
+ /// to pull from the left stream. If a batch is found in the left stream,
it delegates
+ /// the handling to `process_batch_from_left`. If both streams are
exhausted, the state is set
+ /// to indicate both streams are exhausted without final results yet.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after checking the exhaustion state.
+ fn handle_right_stream_end(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+ match ready!(self.left_stream().poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Poll::Ready(Ok(StatefulStreamResult::Continue));
+ }
+ Poll::Ready(self.process_batch_after_right_end(batch))
+ }
+ Some(Err(e)) => Poll::Ready(Err(e)),
+ None => {
+ self.set_state(SHJStreamState::BothExhausted {
+ final_result: false,
+ });
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ }
+ }
+ }
+
+ /// Asynchronously handles the scenario when the left stream is exhausted.
+ ///
+ /// When the left stream is exhausted, this default
+ /// implementation tries to pull from the right stream and delegates the
batch
+ /// handling to `process_batch_after_left_end`. If both streams are
exhausted, the state
+ /// is updated to indicate so.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after checking the exhaustion state.
+ fn handle_left_stream_end(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+ match ready!(self.right_stream().poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Poll::Ready(Ok(StatefulStreamResult::Continue));
+ }
+ Poll::Ready(self.process_batch_after_left_end(batch))
+ }
+ Some(Err(e)) => Poll::Ready(Err(e)),
+ None => {
+ self.set_state(SHJStreamState::BothExhausted {
+ final_result: false,
+ });
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ }
+ }
+ }
+
+ /// Handles the state when both streams are exhausted and final results
are yet to be produced.
+ ///
+ /// This default implementation switches the state to indicate both
streams are
+ /// exhausted with final results and then invokes the handling for this
specific
+ /// scenario via `process_batches_before_finalization`.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state
result after both streams are exhausted.
+ fn prepare_for_final_results_after_exhaustion(
+ &mut self,
+ ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+ self.set_state(SHJStreamState::BothExhausted { final_result: true });
+ self.process_batches_before_finalization()
+ }
+
fn process_batch_from_right(
&mut self,
batch: RecordBatch,
@@ -1189,16 +1409,14 @@ impl EagerJoinStream for SymmetricHashJoinStream {
&mut self.left_stream
}
- fn set_state(&mut self, state: EagerJoinStreamState) {
+ fn set_state(&mut self, state: SHJStreamState) {
self.state = state;
}
- fn state(&mut self) -> EagerJoinStreamState {
+ fn state(&mut self) -> SHJStreamState {
self.state.clone()
}
-}
-impl SymmetricHashJoinStream {
fn size(&self) -> usize {
let mut size = 0;
size += std::mem::size_of_val(&self.schema);
@@ -1321,6 +1539,34 @@ impl SymmetricHashJoinStream {
}
}
+/// Represents the various states of an symmetric hash join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum SHJStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
#[cfg(test)]
mod tests {
use std::collections::HashMap;
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index acf9ed4d7e..0d99d7a163 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1494,10 +1494,9 @@ impl BuildProbeJoinMetrics {
}
/// The `handle_state` macro is designed to process the result of a
state-changing
-/// operation, encountered e.g. in implementations of `EagerJoinStream`. It
-/// operates on a `StatefulStreamResult` by matching its variants and executing
-/// corresponding actions. This macro is used to streamline code that deals
with
-/// state transitions, reducing boilerplate and improving readability.
+/// operation. It operates on a `StatefulStreamResult` by matching its
variants and
+/// executing corresponding actions. This macro is used to streamline code
that deals
+/// with state transitions, reducing boilerplate and improving readability.
///
/// # Cases
///
@@ -1525,26 +1524,7 @@ macro_rules! handle_state {
};
}
-/// The `handle_async_state` macro adapts the `handle_state` macro for use in
-/// asynchronous operations, particularly when dealing with `Poll` results
within
-/// async traits like `EagerJoinStream`. It polls the asynchronous
state-changing
-/// function using `poll_unpin` and then passes the result to `handle_state`
for
-/// further processing.
-///
-/// # Arguments
-///
-/// * `$state_func`: An async function or future that returns a
-/// `Result<StatefulStreamResult<_>>`.
-/// * `$cx`: The context to be passed for polling, usually of type `&mut
Context`.
-///
-#[macro_export]
-macro_rules! handle_async_state {
- ($state_func:expr, $cx:expr) => {
- $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
- };
-}
-
-/// Represents the result of an operation on stateful join stream.
+/// Represents the result of a stateful operation.
///
/// This enumueration indicates whether the state produced a result that is
/// ready for use (`Ready`) or if the operation requires continuation
(`Continue`).
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]