alamb commented on code in PR #5490:
URL: https://github.com/apache/arrow-datafusion/pull/5490#discussion_r1129654905


##########
datafusion/core/src/physical_plan/common.rs:
##########
@@ -39,8 +39,14 @@ use std::task::{Context, Poll};
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
+/// [`MemoryReservation`] used across query execution streams
 pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
+/// [`MemoryReservation`] used at query operator level
+/// `Option` wrapper allows to initialize empty reservation in operator 
constructor,
+/// and set it to actual reservation at stream level.
+pub(crate) type OperatorMemoryReservation = 
Arc<Mutex<Option<SharedMemoryReservation>>>;

Review Comment:
   ~I wonder if the double layer of Mutex is necessary (Mutex over an Option of 
a Mutex)~
   
   Update after reading the code I see why it is necessary. 
   
   Perhaps it would make the code clearer  (as a follow on PR, perhaps) if we 
had a proper `SharedMemoryReservation` that could be cloned and handled 
mutability internally. 🤔 



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1173,6 +1149,18 @@ impl HashJoinStream {
         };
         build_timer.done();
 
+        // Reserving memory for visited_left_side bitmap in case it hasn't 
been initialied yet
+        // and join_type requires to store it
+        if self.visited_left_side.is_none()

Review Comment:
   You might be able to make this less verbose by moving the initialization 
into  the initialization of `visited_left_size`
   
   
   ```
           let visited_left_side = self.visited_left_side.get_or_insert_with(|| 
{
               let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 
8);
               self.reservation.lock().try_grow(visited_bitmap_size)?;
               self.join_metrics.build_mem_used.add(visited_bitmap_size);
               ... 
   ```
   



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -683,6 +692,7 @@ mod tests {
             err.to_string(),
             "External error: Resources exhausted: Failed to allocate 
additional"
         );
+        assert_contains!(err.to_string(), "CrossJoinExec");

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -146,50 +150,6 @@ pub struct HashJoinExec {
     pub(crate) null_equals_null: bool,
 }
 
-/// Metrics for HashJoinExec

Review Comment:
   it is a nice cleanup to use `BuildProbeMetrics` 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to