UBarney commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2182032475
########## datafusion/physical-plan/src/joins/utils.rs: ########## @@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices( probe_indices: UInt32Array, filter: &JoinFilter, build_side: JoinSide, + max_intermediate_size: Option<usize>, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); }; - let intermediate_batch = build_batch_from_indices( - filter.schema(), - build_input_buffer, - probe_batch, - &build_indices, - &probe_indices, - filter.column_indices(), - build_side, - )?; - let filter_result = filter - .expression() - .evaluate(&intermediate_batch)? - .into_array(intermediate_batch.num_rows())?; + let filter_result = if let Some(max_size) = max_intermediate_size { Review Comment: > Shouldn't we have this done in this pull request? Yes, Sorry, I misunderstood. After moving this logic to `build_join_indices`, the performance got worse. Is it possible that there's an extra `concat` operation? | ID | SQL | join_limit_join_batch_size Time(s) | mv_limit_to_build Time(s) | Performance Change | |----|-----|-------------|------------|-------------------| | 1 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value; | 0.560 | 0.639 | -14.03% | | 2 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.389 | 0.370 | +4.78% | | 3 | select t1.value from range(8192) t1 left join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.373 | 0.372 | +0.11% | | 4 | select t1.value from range(8192) t1 join range(81920) t2 on t1.value + t2.value < t1.value * t2.value; | 1.552 | 2.162 | -39.30% | | 5 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value > t1.value * t2.value; | 0.061 | 0.061 | +0.98% | | 6 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value < t1.value * t2.value; | 0.156 | 0.197 | -26.41% | <details> ``` if let Some(filter) = filter { let num_indices = left_indices.len(); let mut left_chunks = Vec::with_capacity(num_indices.div_ceil(max_intermediate_batch_size)); let mut right_chunks = Vec::with_capacity(num_indices.div_ceil(max_intermediate_batch_size)); for i in (0..num_indices).step_by(max_intermediate_batch_size) { let end = min(num_indices, i + max_intermediate_batch_size); let len = end - i; let left_slice = left_indices.slice(i, len); let right_slice = right_indices.slice(i, len); let (left_filtered, right_filtered) = apply_join_filter_to_indices( left_batch, right_batch, left_slice, right_slice, filter, JoinSide::Left, None, )?; left_chunks.push(left_filtered); right_chunks.push(right_filtered); } let left_ref: Vec<&dyn Array> = left_chunks.iter().map(|x| x as &dyn Array).collect(); let right_ref: Vec<&dyn Array> = right_chunks.iter().map(|x| x as &dyn Array).collect(); Ok(( downcast_array(compute::concat(&left_ref)?.as_ref()), downcast_array(compute::concat(&right_ref)?.as_ref()), )) } ``` </details> I think it's better to apply this limit within `apply_join_filter_to_indices`. The reason is that this function might be called by others like hash_join, and the `(left_indices, right_indices)` they pass in could also be very large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org