UBarney commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2203230458
########## datafusion/physical-plan/src/joins/utils.rs: ########## @@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices( probe_indices: UInt32Array, filter: &JoinFilter, build_side: JoinSide, + max_intermediate_size: Option<usize>, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); }; - let intermediate_batch = build_batch_from_indices( - filter.schema(), - build_input_buffer, - probe_batch, - &build_indices, - &probe_indices, - filter.column_indices(), - build_side, - )?; - let filter_result = filter - .expression() - .evaluate(&intermediate_batch)? - .into_array(intermediate_batch.num_rows())?; + let filter_result = if let Some(max_size) = max_intermediate_size { + let mut filter_results = + Vec::with_capacity(build_indices.len().div_ceil(max_size)); + + for i in (0..build_indices.len()).step_by(max_size) { + let end = min(build_indices.len(), i + max_size); + let len = end - i; + let intermediate_batch = build_batch_from_indices( + filter.schema(), + build_input_buffer, + probe_batch, + &build_indices.slice(i, len), + &probe_indices.slice(i, len), + filter.column_indices(), + build_side, + )?; + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + filter_results.push(filter_result); Review Comment: However, after replacing `concat` with `coalescer.push_batch_with_filter`, performance actually decreased. <details> <summary>code</summary> ``` pub(crate) fn apply_join_filter_to_indices( build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, build_indices: UInt64Array, probe_indices: UInt32Array, filter: &JoinFilter, build_side: JoinSide, max_intermediate_size: Option<usize>, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); }; if let Some(max_size) = max_intermediate_size { let indices_schema = Arc::new(Schema::new(vec![ Field::new("build_indices", arrow::datatypes::DataType::UInt64, false), Field::new("probe_indices", arrow::datatypes::DataType::UInt32, false), ])); let build_indices = Arc::new(build_indices); let probe_indices = Arc::new(probe_indices); let indices_batch = RecordBatch::try_new( indices_schema, vec![ Arc::clone(&build_indices) as Arc<dyn Array>, Arc::clone(&probe_indices) as Arc<dyn Array>, ], )?; let mut coalescer = BatchCoalescer::new(indices_batch.schema(), indices_batch.num_rows()); for i in (0..build_indices.len()).step_by(max_size) { let end = min(build_indices.len(), i + max_size); let len = end - i; let intermediate_batch = build_batch_from_indices( filter.schema(), build_input_buffer, probe_batch, &build_indices.slice(i, len), &probe_indices.slice(i, len), filter.column_indices(), build_side, )?; let filter_result = filter .expression() .evaluate(&intermediate_batch)? .into_array(intermediate_batch.num_rows())?; coalescer.push_batch_with_filter( indices_batch.slice(i, len), as_boolean_array(&filter_result)?, )?; } coalescer.finish_buffered_batch()?; let result = coalescer.next_completed_batch(); if result.is_none() { return Ok((build_indices.slice(0, 0), probe_indices.slice(0, 0))); } if coalescer.has_completed_batch() { return internal_err!("should not have completed_batch"); } let (_, arrays, _) = result.unwrap().into_parts(); return Ok(( downcast_array(arrays[0].as_ref()), downcast_array(arrays[1].as_ref()), )); } let intermediate_batch = build_batch_from_indices( filter.schema(), build_input_buffer, probe_batch, &build_indices, &probe_indices, filter.column_indices(), build_side, )?; let filter_result = filter .expression() .evaluate(&intermediate_batch)? .into_array(intermediate_batch.num_rows())?; let mask = as_boolean_array(&filter_result)?; let left_filtered = compute::filter(&build_indices, mask)?; let right_filtered = compute::filter(&probe_indices, mask)?; Ok(( downcast_array(left_filtered.as_ref()), downcast_array(right_filtered.as_ref()), )) } ``` </details> <details> <summary>bench result</summary> | ID | SQL | join_limit_join_batch_size Time(s) | use_BatchCoalescer Time(s) | Performance Change | |----|-----|-------------|------------|-------------------| | 1 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value; | 0.559 | 0.671 | 1.20x slower 🐌 | | 2 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.377 | 0.371 | +1.02x faster 🚀 | | 3 | select t1.value from range(8192) t1 left join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.363 | 0.363 | +1.00x faster 🚀 | | 4 | select t1.value from range(8192) t1 join range(81920) t2 on t1.value + t2.value < t1.value * t2.value; | 1.556 | 2.031 | 1.30x slower 🐌 | | 5 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value > t1.value * t2.value; | 0.063 | 0.057 | +1.11x faster 🚀 | | 6 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value < t1.value * t2.value; | 0.153 | 0.194 | 1.27x slower 🐌 | | SQL Query | join_limit_join_batch_size Memory | use_BatchCoalescer Memory | Improvement | | ---------------------------------------------------------------------------------------------------------- | --------------------------------- | ------------------------- | --------------- | | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value; | 1.57 GB | 2.31 GB | 1.47x more 🐌 | | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 841.5 MB | 824.9 MB | +1.02x saved 🚀 | | select t1.value from range(8192) t1 left join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 845.3 MB | 824.6 MB | +1.03x saved 🚀 | | select t1.value from range(8192) t1 join range(81920) t2 on t1.value + t2.value < t1.value * t2.value; | 15.00 GB | 20.36 GB | 1.36x more 🐌 | | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value > t1.value * t2.value; | 328.1 MB | 327.6 MB | +1.00x saved 🚀 | | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value < t1.value * t2.value; | 659.8 MB | 810.4 MB | 1.23x more 🐌 | </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org