korowa commented on code in PR #5490:
URL: https://github.com/apache/arrow-datafusion/pull/5490#discussion_r1129895159


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1173,6 +1149,18 @@ impl HashJoinStream {
         };
         build_timer.done();
 
+        // Reserving memory for visited_left_side bitmap in case it hasn't 
been initialied yet
+        // and join_type requires to store it
+        if self.visited_left_side.is_none()

Review Comment:
   It was my first intention, but then I've realized that `Option` doesn't have 
fallible versions of `get_or_insert`-like methods (i.e.`try_get_or_insert_with` 
in this case would be just right), and, the only alternative I saw, was storing 
`visited_left_side` as a `Result<_, DataFusionError> which, I guess, would also 
be not really clean.



##########
datafusion/core/src/physical_plan/common.rs:
##########
@@ -39,8 +39,14 @@ use std::task::{Context, Poll};
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
+/// [`MemoryReservation`] used across query execution streams
 pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
+/// [`MemoryReservation`] used at query operator level
+/// `Option` wrapper allows to initialize empty reservation in operator 
constructor,
+/// and set it to actual reservation at stream level.
+pub(crate) type OperatorMemoryReservation = 
Arc<Mutex<Option<SharedMemoryReservation>>>;

Review Comment:
   Unfortunately I haven't found better solution yet, but I'll check if there 
is better way to wrap `MemoryReservation` into something allowing attempts of 
creation (or initialization) reservation from multiple streams.
   
   Another solution, probably, is to allow to allow creation of 
MemoryReservation without providing context (in `HashJoinExec::new`), and 
registering it in `MemoryPool` later (in `HashJoinExec.execute`, when we have 
context available).



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -452,10 +464,7 @@ impl CrossJoinStream {
 
                     Some(result)
                 }
-                other => {
-                    self.reservation.lock().free();
-                    other
-                }
+                other => other,

Review Comment:
   True, now it seems more reliable to store reservations along with left-data 
they are targeted at, and utilize [this 
feature](https://github.com/apache/arrow-datafusion/blob/deeaa5632ed99a58b91767261570756db736d158/datafusion/execution/src/memory_pool/mod.rs#L159)



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec {
 }
 
 async fn collect_left_input(
+    partition: Option<usize>,
     random_state: RandomState,
     left: Arc<dyn ExecutionPlan>,
     on_left: Vec<Column>,
     context: Arc<TaskContext>,
+    metrics: BuildProbeJoinMetrics,
+    reservation: SharedMemoryReservation,
 ) -> Result<JoinLeftData> {
     let schema = left.schema();
-    let start = Instant::now();
-    // merge all left parts into a single stream
-    let merge = {
-        if left.output_partitioning().partition_count() != 1 {
-            Arc::new(CoalescePartitionsExec::new(left))
-        } else {
-            left
-        }
-    };
-    let stream = merge.execute(0, context)?;

Review Comment:
   Correct, these two functions shared same logic, except for left-side stream 
sourcing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to