ozankabak commented on code in PR #8234:
URL: https://github.com/apache/arrow-datafusion/pull/8234#discussion_r1399411079
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
+/// This trait defines the core methods for handling asynchronous join
operations
+/// between two streams (left and right).
+#[async_trait]
+pub trait EagerJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>>
+ where
+ Self: Send,
+ {
+ loop {
+ return match self.state() {
+ EagerJoinStreamState::PullRight => {
+ handle_async_state!(self.fetch_next_from_right_stream(),
cx)
+ }
+ EagerJoinStreamState::PullLeft => {
+ handle_async_state!(self.fetch_next_from_left_stream(), cx)
+ }
+ EagerJoinStreamState::RightExhausted => {
+ handle_async_state!(self.handle_right_stream_end(), cx)
+ }
+ EagerJoinStreamState::LeftExhausted => {
+ handle_async_state!(self.handle_left_stream_end(), cx)
+ }
+ EagerJoinStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ EagerJoinStreamState::BothExhausted { final_result: true } => {
+ Poll::Ready(None)
+ }
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right (probe) stream.
+ ///
+ /// This default implementation checks for the next value in the right
stream.
+ /// If a batch is found, the state is switched to `PullLeft`, and the
batch handling
+ /// is delegated to `process_batch_from_right`. If the stream ends, the
state is set to `RightExhausted`.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StreamJoinStateResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
+ async fn fetch_next_from_right_stream(
Review Comment:
> Are these plans described anywhere?
Not yet, but hopefully soon. We will continue publishing blog posts about
this stuff in Datafusion and will talk about future goodies there :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]