alihan-synnada commented on code in PR #12531:
URL: https://github.com/apache/datafusion/pull/12531#discussion_r1768086606


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -456,21 +458,72 @@ struct NestedLoopJoinStream {
     // null_equals_null: bool
     /// Join execution metrics
     join_metrics: BuildProbeJoinMetrics,
+    /// Cache for join indices calculations
+    indices_cache: (UInt64Array, UInt32Array),
 }
 
+/// Creates a Cartesian product of two input batches, preserving the order of 
the right batch,
+/// and applying a join filter if provided.
+///
+/// # Example
+/// Input:
+/// left = [0, 1], right = [0, 1, 2]
+///
+/// Output:
+/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2]
+///
+/// Input:
+/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a
+///
+/// Output:
+/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 
2, 2, 3, 3, 3]
 fn build_join_indices(
-    right_row_index: usize,
     left_batch: &RecordBatch,
     right_batch: &RecordBatch,
     filter: Option<&JoinFilter>,
+    indices_cache: &mut (UInt64Array, UInt32Array),
 ) -> Result<(UInt64Array, UInt32Array)> {
-    // left indices: [0, 1, 2, 3, 4, ..., left_row_count]
-    // right indices: [right_index, right_index, ..., right_index]
-
     let left_row_count = left_batch.num_rows();
-    let left_indices = UInt64Array::from_iter_values(0..(left_row_count as 
u64));
-    let right_indices = UInt32Array::from(vec![right_row_index as u32; 
left_row_count]);
-    // in the nested loop join, the filter can contain non-equal and equal 
condition.
+    let right_row_count = right_batch.num_rows();
+    let output_row_count = left_row_count * right_row_count;
+
+    // We always use the same indices before applying the filter, so we can 
cache them
+    let (left_indices_cache, right_indices_cache) = indices_cache;
+    let cached_output_row_count = left_indices_cache.len();

Review Comment:
   I guess we can do away with the cache or make it optional. In case we remove 
the cache, we could create the indices and apply the filter in chunks similar 
to before. If we pass in a range that we then use to calculate the indices for 
instead of creating `right_batch.num_rows()` chunks, we can control the size of 
the intermediate batches too. Something like 
`(0..output_row_count).chunks(CHUNK_SIZE)` should do the trick, now that we 
create the indices by mapping the current row index.
   
   I believe it can bring the performance without cache down to a similar level 
to before the regression, maybe even better. I'll run a few benchmarks with 
this setup without a cache and update the benchmarks table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to