korowa commented on code in PR #12531:
URL: https://github.com/apache/datafusion/pull/12531#discussion_r1767370510
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -456,21 +458,72 @@ struct NestedLoopJoinStream {
// null_equals_null: bool
/// Join execution metrics
join_metrics: BuildProbeJoinMetrics,
+ /// Cache for join indices calculations
+ indices_cache: (UInt64Array, UInt32Array),
}
+/// Creates a Cartesian product of two input batches, preserving the order of
the right batch,
+/// and applying a join filter if provided.
+///
+/// # Example
+/// Input:
+/// left = [0, 1], right = [0, 1, 2]
+///
+/// Output:
+/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2]
+///
+/// Input:
+/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a
+///
+/// Output:
+/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1,
2, 2, 3, 3, 3]
fn build_join_indices(
- right_row_index: usize,
left_batch: &RecordBatch,
right_batch: &RecordBatch,
filter: Option<&JoinFilter>,
+ indices_cache: &mut (UInt64Array, UInt32Array),
) -> Result<(UInt64Array, UInt32Array)> {
- // left indices: [0, 1, 2, 3, 4, ..., left_row_count]
- // right indices: [right_index, right_index, ..., right_index]
-
let left_row_count = left_batch.num_rows();
- let left_indices = UInt64Array::from_iter_values(0..(left_row_count as
u64));
- let right_indices = UInt32Array::from(vec![right_row_index as u32;
left_row_count]);
- // in the nested loop join, the filter can contain non-equal and equal
condition.
+ let right_row_count = right_batch.num_rows();
+ let output_row_count = left_row_count * right_row_count;
+
+ // We always use the same indices before applying the filter, so we can
cache them
+ let (left_indices_cache, right_indices_cache) = indices_cache;
+ let cached_output_row_count = left_indices_cache.len();
Review Comment:
In case of 25 rows build-side there are 200k arrays, for 500 rows -- 4kk and
so on (I suppose we don't need that much data on the build side to reach GBs
size for these arrays).
I understand that we still will have to create interemediate batches to
apply filter, and produce output batches, but I suppose, that starting from
some point the size of these caches will become meaningful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]