alamb commented on code in PR #8234:
URL: https://github.com/apache/arrow-datafusion/pull/8234#discussion_r1397715453
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
Review Comment:
I think it would help to add some comments here explaining what this macro
was used for
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
Review Comment:
Could we please define in these comments what an "eager join stream" is as I
am not familiar with the concept and I think others may not be either?
Perhaps we can include some types of joins that have this property (I think
Symmetric Hash Join and Merge Join?)
Is the key feature of an eager stream that it produces output as soon as
possible as it reads input, rather than buffering? Is it true that an eager
join stream buffers as little data as possible, but potentially sacrifices some
performance?
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
Review Comment:
Something else that might help would be to document some examples of how
this is used (like what the control flow / state transitions could be).
However, I don't have a great idea of how to do this specifically
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
Review Comment:
Could you add some documentation about what the join would be "ready" for? I
think it means "ready for output" but I am not sure 🤔
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
Review Comment:
Could you add some documentation about what the join would be "ready" for? I
think it means "ready for output" but I am not sure 🤔
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
+/// This trait defines the core methods for handling asynchronous join
operations
+/// between two streams (left and right).
+#[async_trait]
+pub trait EagerJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>>
+ where
+ Self: Send,
+ {
+ loop {
+ return match self.state() {
+ EagerJoinStreamState::PullRight => {
+ handle_async_state!(self.fetch_next_from_right_stream(),
cx)
+ }
+ EagerJoinStreamState::PullLeft => {
+ handle_async_state!(self.fetch_next_from_left_stream(), cx)
+ }
+ EagerJoinStreamState::RightExhausted => {
+ handle_async_state!(self.handle_right_stream_end(), cx)
+ }
+ EagerJoinStreamState::LeftExhausted => {
+ handle_async_state!(self.handle_left_stream_end(), cx)
+ }
+ EagerJoinStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ EagerJoinStreamState::BothExhausted { final_result: true } => {
+ Poll::Ready(None)
+ }
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right (probe) stream.
Review Comment:
I am somewhat confused about the term "probe" here -- as I normally think
of the "probe" side of a hash join. However, I thought stream joins aren't
necessairly hash joins 🤔
##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -50,8 +52,135 @@ use datafusion_physical_expr::{
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
+use hashbrown::raw::RawTable;
use parking_lot::Mutex;
+/// Maps a `u64` hash value based on the build side ["on" values] to a list of
indices with this key's value.
+///
+/// By allocating a `HashMap` with capacity for *at least* the number of rows
for entries at the build side,
+/// we make sure that we don't have to re-hash the hashmap, which needs access
to the key (the hash in this case) value.
+///
+/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and
8 for hash value 1
+/// As the key is a hash value, we need to check possible hash collisions in
the probe stage
+/// During this stage it might be the case that a row is contained the same
hashmap value,
+/// but the values don't match. Those are checked in the
[`equal_rows_arr`](crate::joins::hash_join::equal_rows_arr) method.
+///
+/// The indices (values) are stored in a separate chained list stored in the
`Vec<u64>`.
+///
+/// The first value (+1) is stored in the hashmap, whereas the next value is
stored in array at the position value.
+///
+/// The chain can be followed until the value "0" has been reached, meaning
the end of the list.
+/// Also see chapter 5.3 of [Balancing vectorized query execution with
bandwidth-optimized
storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
+///
+/// # Example
+///
+/// ``` text
+/// See the example below:
+///
+/// Insert (10,1) <-- insert hash value 10 with row index 1
+/// map:
+/// ----------
+/// | 10 | 2 |
+/// ----------
+/// next:
+/// ---------------------
+/// | 0 | 0 | 0 | 0 | 0 |
+/// ---------------------
+/// Insert (20,2)
+/// map:
+/// ----------
+/// | 10 | 2 |
+/// | 20 | 3 |
+/// ----------
+/// next:
+/// ---------------------
+/// | 0 | 0 | 0 | 0 | 0 |
+/// ---------------------
+/// Insert (10,3) <-- collision! row index 3 has a hash value of 10
as well
+/// map:
+/// ----------
+/// | 10 | 4 |
+/// | 20 | 3 |
+/// ----------
+/// next:
+/// ---------------------
+/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 10 maps to 4,2 (which means indices
values 3,1)
+/// ---------------------
+/// Insert (10,4) <-- another collision! row index 4 ALSO has a hash
value of 10
+/// map:
+/// ---------
+/// | 10 | 5 |
+/// | 20 | 3 |
+/// ---------
+/// next:
+/// ---------------------
+/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means
indices values 4,3,1)
+/// ---------------------
+/// ```
+pub struct JoinHashMap {
Review Comment:
If we are going to move this structure anyways perhaps we can put it into
its own module (e.g `datafusion/physical-plan/src/joins/join_hash_map.rs` or
something)
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
+/// This trait defines the core methods for handling asynchronous join
operations
+/// between two streams (left and right).
+#[async_trait]
+pub trait EagerJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>>
+ where
+ Self: Send,
+ {
+ loop {
+ return match self.state() {
+ EagerJoinStreamState::PullRight => {
+ handle_async_state!(self.fetch_next_from_right_stream(),
cx)
+ }
+ EagerJoinStreamState::PullLeft => {
+ handle_async_state!(self.fetch_next_from_left_stream(), cx)
+ }
+ EagerJoinStreamState::RightExhausted => {
+ handle_async_state!(self.handle_right_stream_end(), cx)
+ }
+ EagerJoinStreamState::LeftExhausted => {
+ handle_async_state!(self.handle_left_stream_end(), cx)
+ }
+ EagerJoinStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ EagerJoinStreamState::BothExhausted { final_result: true } => {
+ Poll::Ready(None)
+ }
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right (probe) stream.
+ ///
+ /// This default implementation checks for the next value in the right
stream.
+ /// If a batch is found, the state is switched to `PullLeft`, and the
batch handling
+ /// is delegated to `process_batch_from_right`. If the stream ends, the
state is set to `RightExhausted`.
+ ///
+ /// # Returns
+ ///
+ /// * `Result<StreamJoinStateResult<Option<RecordBatch>>>` - The state
result after pulling the batch.
+ async fn fetch_next_from_right_stream(
Review Comment:
I am somewhat confused by this default implementation as it implies that
join will always "ping pong" back and forth between fetching left and right
inputs, while in realty I think the details of how the stream is implemented
and how the join keys are distributed across batches could require fetching
multiple batches from one (or both) inputs before progress can be made.
I am thinking of a join on `a = b` where all the rows in the batch have the
same join key, for example:
Batch 1
| `a` |
|--------|
| 100 |
| 100 |
Batch 2
| `a` |
|--------|
| 100 |
| 100 |
Batch 3
| `a` |
|--------|
| 100 |
| 200 |
Wouldn't the symmetric hash join have to read all three batches to find the
next join key (`200`) before reading a batch from the other input / producing
output?
I didn't see how the symmetric hash handles this case, so I must be missing
something
##########
datafusion/physical-plan/src/joins/stream_join_utils.rs:
##########
@@ -740,20 +617,338 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}
+#[macro_export]
+macro_rules! handle_state {
+ ($match_case:expr) => {
+ match $match_case {
+ Ok(StreamJoinStateResult::Continue) => continue,
+ Ok(StreamJoinStateResult::Ready(res)) =>
Poll::Ready(Ok(res).transpose()),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! handle_async_state {
+ ($state_func:expr, $cx:expr) => {
+ $crate::handle_state!(ready!($state_func.poll_unpin($cx)))
+ };
+}
+
+/// Represents the possible results of a state in the join stream.
+pub enum StreamJoinStateResult<T> {
+ Ready(T),
+ Continue,
+}
+
+/// Represents the various states of an eager join stream operation.
+///
+/// This enum is used to track the current state of streaming during a join
+/// operation. It provides indicators as to which side of the join needs to be
+/// pulled next or if one (or both) sides have been exhausted. This allows
+/// for efficient management of resources and optimal performance during the
+/// join process.
+#[derive(Clone, Debug)]
+pub enum EagerJoinStreamState {
+ /// Indicates that the next step should pull from the right side of the
join.
+ PullRight,
+
+ /// Indicates that the next step should pull from the left side of the
join.
+ PullLeft,
+
+ /// State representing that the right side of the join has been fully
processed.
+ RightExhausted,
+
+ /// State representing that the left side of the join has been fully
processed.
+ LeftExhausted,
+
+ /// Represents a state where both sides of the join are exhausted.
+ ///
+ /// The `final_result` field indicates whether the join operation has
+ /// produced a final result or not.
+ BothExhausted { final_result: bool },
+}
+
+/// Represents the asynchronous trait for an eager join stream.
+/// This trait defines the core methods for handling asynchronous join
operations
+/// between two streams (left and right).
+#[async_trait]
+pub trait EagerJoinStream {
+ /// Implements the main polling logic for the join stream.
+ ///
+ /// This method continuously checks the state of the join stream and
+ /// acts accordingly by delegating the handling to appropriate sub-methods
+ /// depending on the current state.
+ ///
+ /// # Arguments
+ ///
+ /// * `cx` - A context that facilitates cooperative non-blocking execution
within a task.
+ ///
+ /// # Returns
+ ///
+ /// * `Poll<Option<Result<RecordBatch>>>` - A polled result, either a
`RecordBatch` or None.
+ fn poll_next_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>>
+ where
+ Self: Send,
+ {
+ loop {
+ return match self.state() {
+ EagerJoinStreamState::PullRight => {
+ handle_async_state!(self.fetch_next_from_right_stream(),
cx)
+ }
+ EagerJoinStreamState::PullLeft => {
+ handle_async_state!(self.fetch_next_from_left_stream(), cx)
+ }
+ EagerJoinStreamState::RightExhausted => {
+ handle_async_state!(self.handle_right_stream_end(), cx)
+ }
+ EagerJoinStreamState::LeftExhausted => {
+ handle_async_state!(self.handle_left_stream_end(), cx)
+ }
+ EagerJoinStreamState::BothExhausted {
+ final_result: false,
+ } => {
+
handle_state!(self.prepare_for_final_results_after_exhaustion())
+ }
+ EagerJoinStreamState::BothExhausted { final_result: true } => {
+ Poll::Ready(None)
+ }
+ };
+ }
+ }
+ /// Asynchronously pulls the next batch from the right (probe) stream.
Review Comment:
I am somewhat confused about the term "probe" here -- as I normally think
of the "probe" side of a hash join. However, I thought stream joins aren't
necessairly hash joins 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]