metesynnada commented on code in PR #8234:
URL: https://github.com/apache/arrow-datafusion/pull/8234#discussion_r1398775207
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
+/// This trait defines the core methods for handling asynchronous join
operations
+/// between two streams (left and right).
+#[async_trait]
+pub trait EagerJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>>
+ where
+ Self: Send,
+ {
+ loop {
+ return match self.state() {
+ EagerJoinStreamState::PullRight => {
+ handle_async_state!(self.fetch_next_from_right_stream(),
cx)
+ }
+ EagerJoinStreamState::PullLeft => {
+ handle_async_state!(self.fetch_next_from_left_stream(), cx)
+ }
+ EagerJoinStreamState::RightExhausted => {
+ handle_async_state!(self.handle_right_stream_end(), cx)
+ }
+ EagerJoinStreamState::LeftExhausted => {
+ handle_async_state!(self.handle_left_stream_end(), cx)
+ }
+ EagerJoinStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ EagerJoinStreamState::BothExhausted { final_result: true } => {
+ Poll::Ready(None)
+ }
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right (probe) stream.
Review Comment:
I made a commenting mistake here, there shouldn't be a (probe) here since
SHJ alternates build and probe side by creating two hash tables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]