This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 79481839da Remove Eager Trait for Joins (#10721)
79481839da is described below

commit 79481839da993623e899f8835a145ddd8bfc210e
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu May 30 23:41:43 2024 +0300

    Remove Eager Trait for Joins (#10721)
    
    * Remove eager trait
    
    * Update helpers.rs
---
 .../physical-plan/src/joins/stream_join_utils.rs   | 354 +--------------------
 .../physical-plan/src/joins/symmetric_hash_join.rs | 268 +++++++++++++++-
 datafusion/physical-plan/src/joins/utils.rs        |  28 +-
 3 files changed, 263 insertions(+), 387 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index f82eb31f96..0a01d84141 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -20,12 +20,11 @@
 
 use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
-use std::task::{Context, Poll};
 use std::usize;
 
-use crate::joins::utils::{JoinFilter, JoinHashMapType, StatefulStreamResult};
+use crate::joins::utils::{JoinFilter, JoinHashMapType};
 use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
-use crate::{handle_async_state, handle_state, metrics, ExecutionPlan};
+use crate::{metrics, ExecutionPlan};
 
 use arrow::compute::concat_batches;
 use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, 
RecordBatch};
@@ -36,15 +35,12 @@ use datafusion_common::{
     arrow_datafusion_err, plan_datafusion_err, DataFusionError, JoinSide, 
Result,
     ScalarValue,
 };
-use datafusion_execution::SendableRecordBatchStream;
 use datafusion_expr::interval_arithmetic::Interval;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 
-use async_trait::async_trait;
-use futures::{ready, FutureExt, StreamExt};
 use hashbrown::raw::RawTable;
 use hashbrown::HashSet;
 
@@ -629,352 +625,6 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
     }
 }
 
-/// Represents the various states of an eager join stream operation.
-///
-/// This enum is used to track the current state of streaming during a join
-/// operation. It provides indicators as to which side of the join needs to be
-/// pulled next or if one (or both) sides have been exhausted. This allows
-/// for efficient management of resources and optimal performance during the
-/// join process.
-#[derive(Clone, Debug)]
-pub enum EagerJoinStreamState {
-    /// Indicates that the next step should pull from the right side of the 
join.
-    PullRight,
-
-    /// Indicates that the next step should pull from the left side of the 
join.
-    PullLeft,
-
-    /// State representing that the right side of the join has been fully 
processed.
-    RightExhausted,
-
-    /// State representing that the left side of the join has been fully 
processed.
-    LeftExhausted,
-
-    /// Represents a state where both sides of the join are exhausted.
-    ///
-    /// The `final_result` field indicates whether the join operation has
-    /// produced a final result or not.
-    BothExhausted { final_result: bool },
-}
-
-/// `EagerJoinStream` is an asynchronous trait designed for managing 
incremental
-/// join operations between two streams, such as those used in 
`SymmetricHashJoinExec`
-/// and `SortMergeJoinExec`. Unlike traditional join approaches that need to 
scan
-/// one side of the join fully before proceeding, `EagerJoinStream` facilitates
-/// more dynamic join operations by working with streams as they emit data. 
This
-/// approach allows for more efficient processing, particularly in scenarios
-/// where waiting for complete data materialization is not feasible or optimal.
-/// The trait provides a framework for handling various states of such a join
-/// process, ensuring that join logic is efficiently executed as data becomes
-/// available from either stream.
-///
-/// Implementors of this trait can perform eager joins of data from two 
different
-/// asynchronous streams, typically referred to as left and right streams. The
-/// trait provides a comprehensive set of methods to control and execute the 
join
-/// process, leveraging the states defined in `EagerJoinStreamState`. Methods 
are
-/// primarily focused on asynchronously fetching data batches from each stream,
-/// processing them, and managing transitions between various states of the 
join.
-///
-/// This trait's default implementations use a state machine approach to 
navigate
-/// different stages of the join operation, handling data from both streams and
-/// determining when the join completes.
-///
-/// State Transitions:
-/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
-///   - In `fetch_next_from_left_stream`, when fetching a batch from the left 
stream:
-///     - On success (`Some(Ok(batch))`), state transitions to `PullRight` for
-///       processing the batch.
-///     - On error (`Some(Err(e))`), the error is returned, and the state 
remains
-///       unchanged.
-///     - On no data (`None`), state changes to `LeftExhausted`, returning 
`Continue`
-///       to proceed with the join process.
-/// - From `PullRight` to `PullLeft` or `RightExhausted`:
-///   - In `fetch_next_from_right_stream`, when fetching from the right stream:
-///     - If a batch is available, state changes to `PullLeft` for processing.
-///     - On error, the error is returned without changing the state.
-///     - If right stream is exhausted (`None`), state transitions to 
`RightExhausted`,
-///       with a `Continue` result.
-/// - Handling `RightExhausted` and `LeftExhausted`:
-///   - Methods `handle_right_stream_end` and `handle_left_stream_end` manage 
scenarios
-///     when streams are exhausted:
-///     - They attempt to continue processing with the other stream.
-///     - If both streams are exhausted, state changes to `BothExhausted { 
final_result: false }`.
-/// - Transition to `BothExhausted { final_result: true }`:
-///   - Occurs in `prepare_for_final_results_after_exhaustion` when both 
streams are
-///     exhausted, indicating completion of processing and availability of 
final results.
-#[async_trait]
-pub trait EagerJoinStream {
-    /// Implements the main polling logic for the join stream.
-    ///
-    /// This method continuously checks the state of the join stream and
-    /// acts accordingly by delegating the handling to appropriate sub-methods
-    /// depending on the current state.
-    ///
-    /// # Arguments
-    ///
-    /// * `cx` - A context that facilitates cooperative non-blocking execution 
within a task.
-    ///
-    /// # Returns
-    ///
-    /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a 
`RecordBatch` or None.
-    fn poll_next_impl(
-        &mut self,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<RecordBatch>>>
-    where
-        Self: Send,
-    {
-        loop {
-            return match self.state() {
-                EagerJoinStreamState::PullRight => {
-                    handle_async_state!(self.fetch_next_from_right_stream(), 
cx)
-                }
-                EagerJoinStreamState::PullLeft => {
-                    handle_async_state!(self.fetch_next_from_left_stream(), cx)
-                }
-                EagerJoinStreamState::RightExhausted => {
-                    handle_async_state!(self.handle_right_stream_end(), cx)
-                }
-                EagerJoinStreamState::LeftExhausted => {
-                    handle_async_state!(self.handle_left_stream_end(), cx)
-                }
-                EagerJoinStreamState::BothExhausted {
-                    final_result: false,
-                } => {
-                    
handle_state!(self.prepare_for_final_results_after_exhaustion())
-                }
-                EagerJoinStreamState::BothExhausted { final_result: true } => {
-                    Poll::Ready(None)
-                }
-            };
-        }
-    }
-    /// Asynchronously pulls the next batch from the right stream.
-    ///
-    /// This default implementation checks for the next value in the right 
stream.
-    /// If a batch is found, the state is switched to `PullLeft`, and the 
batch handling
-    /// is delegated to `process_batch_from_right`. If the stream ends, the 
state is set to `RightExhausted`.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after pulling the batch.
-    async fn fetch_next_from_right_stream(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        match self.right_stream().next().await {
-            Some(Ok(batch)) => {
-                if batch.num_rows() == 0 {
-                    return Ok(StatefulStreamResult::Continue);
-                }
-                self.set_state(EagerJoinStreamState::PullLeft);
-                self.process_batch_from_right(batch)
-            }
-            Some(Err(e)) => Err(e),
-            None => {
-                self.set_state(EagerJoinStreamState::RightExhausted);
-                Ok(StatefulStreamResult::Continue)
-            }
-        }
-    }
-
-    /// Asynchronously pulls the next batch from the left stream.
-    ///
-    /// This default implementation checks for the next value in the left 
stream.
-    /// If a batch is found, the state is switched to `PullRight`, and the 
batch handling
-    /// is delegated to `process_batch_from_left`. If the stream ends, the 
state is set to `LeftExhausted`.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after pulling the batch.
-    async fn fetch_next_from_left_stream(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        match self.left_stream().next().await {
-            Some(Ok(batch)) => {
-                if batch.num_rows() == 0 {
-                    return Ok(StatefulStreamResult::Continue);
-                }
-                self.set_state(EagerJoinStreamState::PullRight);
-                self.process_batch_from_left(batch)
-            }
-            Some(Err(e)) => Err(e),
-            None => {
-                self.set_state(EagerJoinStreamState::LeftExhausted);
-                Ok(StatefulStreamResult::Continue)
-            }
-        }
-    }
-
-    /// Asynchronously handles the scenario when the right stream is exhausted.
-    ///
-    /// In this default implementation, when the right stream is exhausted, it 
attempts
-    /// to pull from the left stream. If a batch is found in the left stream, 
it delegates
-    /// the handling to `process_batch_from_left`. If both streams are 
exhausted, the state is set
-    /// to indicate both streams are exhausted without final results yet.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after checking the exhaustion state.
-    async fn handle_right_stream_end(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        match self.left_stream().next().await {
-            Some(Ok(batch)) => {
-                if batch.num_rows() == 0 {
-                    return Ok(StatefulStreamResult::Continue);
-                }
-                self.process_batch_after_right_end(batch)
-            }
-            Some(Err(e)) => Err(e),
-            None => {
-                self.set_state(EagerJoinStreamState::BothExhausted {
-                    final_result: false,
-                });
-                Ok(StatefulStreamResult::Continue)
-            }
-        }
-    }
-
-    /// Asynchronously handles the scenario when the left stream is exhausted.
-    ///
-    /// When the left stream is exhausted, this default
-    /// implementation tries to pull from the right stream and delegates the 
batch
-    /// handling to `process_batch_after_left_end`. If both streams are 
exhausted, the state
-    /// is updated to indicate so.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after checking the exhaustion state.
-    async fn handle_left_stream_end(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        match self.right_stream().next().await {
-            Some(Ok(batch)) => {
-                if batch.num_rows() == 0 {
-                    return Ok(StatefulStreamResult::Continue);
-                }
-                self.process_batch_after_left_end(batch)
-            }
-            Some(Err(e)) => Err(e),
-            None => {
-                self.set_state(EagerJoinStreamState::BothExhausted {
-                    final_result: false,
-                });
-                Ok(StatefulStreamResult::Continue)
-            }
-        }
-    }
-
-    /// Handles the state when both streams are exhausted and final results 
are yet to be produced.
-    ///
-    /// This default implementation switches the state to indicate both 
streams are
-    /// exhausted with final results and then invokes the handling for this 
specific
-    /// scenario via `process_batches_before_finalization`.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after both streams are exhausted.
-    fn prepare_for_final_results_after_exhaustion(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        self.set_state(EagerJoinStreamState::BothExhausted { final_result: 
true });
-        self.process_batches_before_finalization()
-    }
-
-    /// Handles a pulled batch from the right stream.
-    ///
-    /// # Arguments
-    ///
-    /// * `batch` - The pulled `RecordBatch` from the right stream.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after processing the batch.
-    fn process_batch_from_right(
-        &mut self,
-        batch: RecordBatch,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
-    /// Handles a pulled batch from the left stream.
-    ///
-    /// # Arguments
-    ///
-    /// * `batch` - The pulled `RecordBatch` from the left stream.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after processing the batch.
-    fn process_batch_from_left(
-        &mut self,
-        batch: RecordBatch,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
-    /// Handles the situation when only the left stream is exhausted.
-    ///
-    /// # Arguments
-    ///
-    /// * `right_batch` - The `RecordBatch` from the right stream.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after the left stream is exhausted.
-    fn process_batch_after_left_end(
-        &mut self,
-        right_batch: RecordBatch,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
-    /// Handles the situation when only the right stream is exhausted.
-    ///
-    /// # Arguments
-    ///
-    /// * `left_batch` - The `RecordBatch` from the left stream.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after the right stream is exhausted.
-    fn process_batch_after_right_end(
-        &mut self,
-        left_batch: RecordBatch,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
-    /// Handles the final state after both streams are exhausted.
-    ///
-    /// # Returns
-    ///
-    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The final 
state result after processing.
-    fn process_batches_before_finalization(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>>;
-
-    /// Provides mutable access to the right stream.
-    ///
-    /// # Returns
-    ///
-    /// * `&mut SendableRecordBatchStream` - Returns a mutable reference to 
the right stream.
-    fn right_stream(&mut self) -> &mut SendableRecordBatchStream;
-
-    /// Provides mutable access to the left stream.
-    ///
-    /// # Returns
-    ///
-    /// * `&mut SendableRecordBatchStream` - Returns a mutable reference to 
the left stream.
-    fn left_stream(&mut self) -> &mut SendableRecordBatchStream;
-
-    /// Sets the current state of the join stream.
-    ///
-    /// # Arguments
-    ///
-    /// * `state` - The new state to be set.
-    fn set_state(&mut self, state: EagerJoinStreamState);
-
-    /// Fetches the current state of the join stream.
-    ///
-    /// # Returns
-    ///
-    /// * `EagerJoinStreamState` - The current state of the join stream.
-    fn state(&mut self) -> EagerJoinStreamState;
-}
-
 #[derive(Debug)]
 pub struct StreamJoinSideMetrics {
     /// Number of batches consumed by this operator
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 0d902af9c6..449c42d697 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -28,17 +28,17 @@
 use std::any::Any;
 use std::fmt::{self, Debug};
 use std::sync::Arc;
-use std::task::Poll;
+use std::task::{Context, Poll};
 use std::{usize, vec};
 
 use crate::common::SharedMemoryReservation;
+use crate::handle_state;
 use crate::joins::hash_join::{equal_rows_arr, update_hash};
 use crate::joins::stream_join_utils::{
     calculate_filter_expr_intervals, combine_two_batches,
     convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
     get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
-    EagerJoinStream, EagerJoinStreamState, PruningJoinHashMap, 
SortedFilterExpr,
-    StreamJoinMetrics,
+    PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
 };
 use crate::joins::utils::{
     apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
@@ -72,7 +72,7 @@ use 
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
 use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
 
 use ahash::RandomState;
-use futures::Stream;
+use futures::{ready, Stream, StreamExt};
 use hashbrown::HashSet;
 use parking_lot::Mutex;
 
@@ -522,7 +522,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
             left_sorted_filter_expr,
             right_sorted_filter_expr,
             null_equals_null: self.null_equals_null,
-            state: EagerJoinStreamState::PullRight,
+            state: SHJStreamState::PullRight,
             reservation,
         }))
     }
@@ -560,7 +560,7 @@ struct SymmetricHashJoinStream {
     /// Memory reservation
     reservation: SharedMemoryReservation,
     /// State machine for input execution
-    state: EagerJoinStreamState,
+    state: SHJStreamState,
 }
 
 impl RecordBatchStream for SymmetricHashJoinStream {
@@ -1103,7 +1103,227 @@ impl OneSideHashJoiner {
     }
 }
 
-impl EagerJoinStream for SymmetricHashJoinStream {
+/// `SymmetricHashJoinStream` manages incremental join operations between two
+/// streams. Unlike traditional join approaches that need to scan one side of
+/// the join fully before proceeding, `SymmetricHashJoinStream` facilitates
+/// more dynamic join operations by working with streams as they emit data. 
This
+/// approach allows for more efficient processing, particularly in scenarios
+/// where waiting for complete data materialization is not feasible or optimal.
+/// The trait provides a framework for handling various states of such a join
+/// process, ensuring that join logic is efficiently executed as data becomes
+/// available from either stream.
+///
+/// This implementation performs eager joins of data from two different 
asynchronous
+/// streams, typically referred to as left and right streams. The 
implementation
+/// provides a comprehensive set of methods to control and execute the join
+/// process, leveraging the states defined in `SHJStreamState`. Methods are
+/// primarily focused on asynchronously fetching data batches from each stream,
+/// processing them, and managing transitions between various states of the 
join.
+///
+/// This implementations use a state machine approach to navigate different
+/// stages of the join operation, handling data from both streams and 
determining
+/// when the join completes.
+///
+/// State Transitions:
+/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
+///   - In `fetch_next_from_left_stream`, when fetching a batch from the left 
stream:
+///     - On success (`Some(Ok(batch))`), state transitions to `PullRight` for
+///       processing the batch.
+///     - On error (`Some(Err(e))`), the error is returned, and the state 
remains
+///       unchanged.
+///     - On no data (`None`), state changes to `LeftExhausted`, returning 
`Continue`
+///       to proceed with the join process.
+/// - From `PullRight` to `PullLeft` or `RightExhausted`:
+///   - In `fetch_next_from_right_stream`, when fetching from the right stream:
+///     - If a batch is available, state changes to `PullLeft` for processing.
+///     - On error, the error is returned without changing the state.
+///     - If right stream is exhausted (`None`), state transitions to 
`RightExhausted`,
+///       with a `Continue` result.
+/// - Handling `RightExhausted` and `LeftExhausted`:
+///   - Methods `handle_right_stream_end` and `handle_left_stream_end` manage 
scenarios
+///     when streams are exhausted:
+///     - They attempt to continue processing with the other stream.
+///     - If both streams are exhausted, state changes to `BothExhausted { 
final_result: false }`.
+/// - Transition to `BothExhausted { final_result: true }`:
+///   - Occurs in `prepare_for_final_results_after_exhaustion` when both 
streams are
+///     exhausted, indicating completion of processing and availability of 
final results.
+impl SymmetricHashJoinStream {
+    /// Implements the main polling logic for the join stream.
+    ///
+    /// This method continuously checks the state of the join stream and
+    /// acts accordingly by delegating the handling to appropriate sub-methods
+    /// depending on the current state.
+    ///
+    /// # Arguments
+    ///
+    /// * `cx` - A context that facilitates cooperative non-blocking execution 
within a task.
+    ///
+    /// # Returns
+    ///
+    /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a 
`RecordBatch` or None.
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state() {
+                SHJStreamState::PullRight => {
+                    
handle_state!(ready!(self.fetch_next_from_right_stream(cx)))
+                }
+                SHJStreamState::PullLeft => {
+                    handle_state!(ready!(self.fetch_next_from_left_stream(cx)))
+                }
+                SHJStreamState::RightExhausted => {
+                    handle_state!(ready!(self.handle_right_stream_end(cx)))
+                }
+                SHJStreamState::LeftExhausted => {
+                    handle_state!(ready!(self.handle_left_stream_end(cx)))
+                }
+                SHJStreamState::BothExhausted {
+                    final_result: false,
+                } => {
+                    
handle_state!(self.prepare_for_final_results_after_exhaustion())
+                }
+                SHJStreamState::BothExhausted { final_result: true } => 
Poll::Ready(None),
+            };
+        }
+    }
+    /// Asynchronously pulls the next batch from the right stream.
+    ///
+    /// This default implementation checks for the next value in the right 
stream.
+    /// If a batch is found, the state is switched to `PullLeft`, and the 
batch handling
+    /// is delegated to `process_batch_from_right`. If the stream ends, the 
state is set to `RightExhausted`.
+    ///
+    /// # Returns
+    ///
+    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after pulling the batch.
+    fn fetch_next_from_right_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.right_stream().poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
+                }
+                self.set_state(SHJStreamState::PullLeft);
+                Poll::Ready(self.process_batch_from_right(batch))
+            }
+            Some(Err(e)) => Poll::Ready(Err(e)),
+            None => {
+                self.set_state(SHJStreamState::RightExhausted);
+                Poll::Ready(Ok(StatefulStreamResult::Continue))
+            }
+        }
+    }
+
+    /// Asynchronously pulls the next batch from the left stream.
+    ///
+    /// This default implementation checks for the next value in the left 
stream.
+    /// If a batch is found, the state is switched to `PullRight`, and the 
batch handling
+    /// is delegated to `process_batch_from_left`. If the stream ends, the 
state is set to `LeftExhausted`.
+    ///
+    /// # Returns
+    ///
+    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after pulling the batch.
+    fn fetch_next_from_left_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.left_stream().poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
+                }
+                self.set_state(SHJStreamState::PullRight);
+                Poll::Ready(self.process_batch_from_left(batch))
+            }
+            Some(Err(e)) => Poll::Ready(Err(e)),
+            None => {
+                self.set_state(SHJStreamState::LeftExhausted);
+                Poll::Ready(Ok(StatefulStreamResult::Continue))
+            }
+        }
+    }
+
+    /// Asynchronously handles the scenario when the right stream is exhausted.
+    ///
+    /// In this default implementation, when the right stream is exhausted, it 
attempts
+    /// to pull from the left stream. If a batch is found in the left stream, 
it delegates
+    /// the handling to `process_batch_from_left`. If both streams are 
exhausted, the state is set
+    /// to indicate both streams are exhausted without final results yet.
+    ///
+    /// # Returns
+    ///
+    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after checking the exhaustion state.
+    fn handle_right_stream_end(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.left_stream().poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
+                }
+                Poll::Ready(self.process_batch_after_right_end(batch))
+            }
+            Some(Err(e)) => Poll::Ready(Err(e)),
+            None => {
+                self.set_state(SHJStreamState::BothExhausted {
+                    final_result: false,
+                });
+                Poll::Ready(Ok(StatefulStreamResult::Continue))
+            }
+        }
+    }
+
+    /// Asynchronously handles the scenario when the left stream is exhausted.
+    ///
+    /// When the left stream is exhausted, this default
+    /// implementation tries to pull from the right stream and delegates the 
batch
+    /// handling to `process_batch_after_left_end`. If both streams are 
exhausted, the state
+    /// is updated to indicate so.
+    ///
+    /// # Returns
+    ///
+    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after checking the exhaustion state.
+    fn handle_left_stream_end(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.right_stream().poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Poll::Ready(Ok(StatefulStreamResult::Continue));
+                }
+                Poll::Ready(self.process_batch_after_left_end(batch))
+            }
+            Some(Err(e)) => Poll::Ready(Err(e)),
+            None => {
+                self.set_state(SHJStreamState::BothExhausted {
+                    final_result: false,
+                });
+                Poll::Ready(Ok(StatefulStreamResult::Continue))
+            }
+        }
+    }
+
+    /// Handles the state when both streams are exhausted and final results 
are yet to be produced.
+    ///
+    /// This default implementation switches the state to indicate both 
streams are
+    /// exhausted with final results and then invokes the handling for this 
specific
+    /// scenario via `process_batches_before_finalization`.
+    ///
+    /// # Returns
+    ///
+    /// * `Result<StatefulStreamResult<Option<RecordBatch>>>` - The state 
result after both streams are exhausted.
+    fn prepare_for_final_results_after_exhaustion(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        self.set_state(SHJStreamState::BothExhausted { final_result: true });
+        self.process_batches_before_finalization()
+    }
+
     fn process_batch_from_right(
         &mut self,
         batch: RecordBatch,
@@ -1189,16 +1409,14 @@ impl EagerJoinStream for SymmetricHashJoinStream {
         &mut self.left_stream
     }
 
-    fn set_state(&mut self, state: EagerJoinStreamState) {
+    fn set_state(&mut self, state: SHJStreamState) {
         self.state = state;
     }
 
-    fn state(&mut self) -> EagerJoinStreamState {
+    fn state(&mut self) -> SHJStreamState {
         self.state.clone()
     }
-}
 
-impl SymmetricHashJoinStream {
     fn size(&self) -> usize {
         let mut size = 0;
         size += std::mem::size_of_val(&self.schema);
@@ -1321,6 +1539,34 @@ impl SymmetricHashJoinStream {
     }
 }
 
+/// Represents the various states of an symmetric hash join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum SHJStreamState {
+    /// Indicates that the next step should pull from the right side of the 
join.
+    PullRight,
+
+    /// Indicates that the next step should pull from the left side of the 
join.
+    PullLeft,
+
+    /// State representing that the right side of the join has been fully 
processed.
+    RightExhausted,
+
+    /// State representing that the left side of the join has been fully 
processed.
+    LeftExhausted,
+
+    /// Represents a state where both sides of the join are exhausted.
+    ///
+    /// The `final_result` field indicates whether the join operation has
+    /// produced a final result or not.
+    BothExhausted { final_result: bool },
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index acf9ed4d7e..0d99d7a163 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1494,10 +1494,9 @@ impl BuildProbeJoinMetrics {
 }
 
 /// The `handle_state` macro is designed to process the result of a 
state-changing
-/// operation, encountered e.g. in implementations of `EagerJoinStream`. It
-/// operates on a `StatefulStreamResult` by matching its variants and executing
-/// corresponding actions. This macro is used to streamline code that deals 
with
-/// state transitions, reducing boilerplate and improving readability.
+/// operation. It operates on a `StatefulStreamResult` by matching its 
variants and
+/// executing corresponding actions. This macro is used to streamline code 
that deals
+/// with state transitions, reducing boilerplate and improving readability.
 ///
 /// # Cases
 ///
@@ -1525,26 +1524,7 @@ macro_rules! handle_state {
     };
 }
 
-/// The `handle_async_state` macro adapts the `handle_state` macro for use in
-/// asynchronous operations, particularly when dealing with `Poll` results 
within
-/// async traits like `EagerJoinStream`. It polls the asynchronous 
state-changing
-/// function using `poll_unpin` and then passes the result to `handle_state` 
for
-/// further processing.
-///
-/// # Arguments
-///
-/// * `$state_func`: An async function or future that returns a
-///   `Result<StatefulStreamResult<_>>`.
-/// * `$cx`: The context to be passed for polling, usually of type `&mut 
Context`.
-///
-#[macro_export]
-macro_rules! handle_async_state {
-    ($state_func:expr, $cx:expr) => {
-        $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
-    };
-}
-
-/// Represents the result of an operation on stateful join stream.
+/// Represents the result of a stateful operation.
 ///
 /// This enumueration indicates whether the state produced a result that is
 /// ready for use (`Ready`) or if the operation requires continuation 
(`Continue`).


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to