This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2482ff45d0 Minor: make LeftJoinData into a struct in CrossJoinExec 
(#13227)
2482ff45d0 is described below

commit 2482ff45d0109c4a576e4cbfdd5769107fd9ede2
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Nov 4 08:07:11 2024 -0500

    Minor: make LeftJoinData into a struct in CrossJoinExec (#13227)
---
 datafusion/physical-plan/src/joins/cross_join.rs | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index a67e1df47b..8c8921eba6 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -47,7 +47,15 @@ use async_trait::async_trait;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
 /// Data of the left side
-type JoinLeftData = (RecordBatch, MemoryReservation);
+#[derive(Debug)]
+struct JoinLeftData {
+    /// Single RecordBatch with all rows from the left side
+    merged_batch: RecordBatch,
+    /// Track memory reservation for merged_batch. Relies on drop
+    /// semantics to release reservation when JoinLeftData is dropped.
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
+}
 
 #[allow(rustdoc::private_intra_doc_links)]
 /// executes partitions in parallel and combines them into a set of
@@ -185,7 +193,10 @@ async fn load_left_input(
 
     let merged_batch = concat_batches(&left_schema, &batches)?;
 
-    Ok((merged_batch, reservation))
+    Ok(JoinLeftData {
+        merged_batch,
+        reservation,
+    })
 }
 
 impl DisplayAs for CrossJoinExec {
@@ -357,7 +368,7 @@ struct CrossJoinStream<T> {
     join_metrics: BuildProbeJoinMetrics,
     /// State of the stream
     state: CrossJoinStreamState,
-    /// Left data
+    /// Left data (copy of the entire buffered left side)
     left_data: RecordBatch,
     /// Batch transformer
     batch_transformer: T,
@@ -457,16 +468,17 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
         let build_timer = self.join_metrics.build_time.timer();
-        let (left_data, _) = match ready!(self.left_fut.get(cx)) {
+        let left_data = match ready!(self.left_fut.get(cx)) {
             Ok(left_data) => left_data,
             Err(e) => return Poll::Ready(Err(e)),
         };
         build_timer.done();
 
+        let left_data = left_data.merged_batch.clone();
         let result = if left_data.num_rows() == 0 {
             StatefulStreamResult::Ready(None)
         } else {
-            self.left_data = left_data.clone();
+            self.left_data = left_data;
             self.state = CrossJoinStreamState::FetchProbeBatch;
             StatefulStreamResult::Continue
         };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to