alamb commented on code in PR #12969:
URL: https://github.com/apache/datafusion/pull/12969#discussion_r1805153470
##########
datafusion/common/src/config.rs:
##########
@@ -338,6 +338,12 @@ config_namespace! {
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default =
false
+
+ /// Should DataFusion enforce batch size in joins or not. By default,
+ /// DataFusion will not enforce batch size in joins. Enforcing batch
size
+ /// in joins can help to avoid out-of-memory errors when joining large
+ /// tables with a highly-selective join filter.
Review Comment:
I think it might help to also explain why this option is not on by default.
Something like this perhaps:
```suggestion
/// Should DataFusion enforce batch size in joins or not. By default,
/// DataFusion will not enforce batch size in joins. Enforcing batch
size
/// in joins can reduce memory usage when joining large
/// tables with a highly-selective join filter, but is also slightly
slower.
```
##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -1635,6 +1643,87 @@ pub(crate) fn asymmetric_join_output_partitioning(
}
}
+pub(crate) trait BatchTransformer: Debug + Clone {
Review Comment:
I think some context would help here. Something like this perhaps:
```suggestion
/// Trait for incrementally generating Join output.
///
/// This trait is used to limit some join outputs
/// so it does not produce single large batches
pub(crate) trait BatchTransformer: Debug + Clone {
```
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -345,25 +351,49 @@ impl ExecutionPlan for NestedLoopJoinExec {
)
});
+ let batch_size = context.session_config().batch_size();
+ let enforce_batch_size_in_joins =
+ context.session_config().enforce_batch_size_in_joins();
+
let outer_table = self.right.execute(partition, context)?;
let indices_cache = (UInt64Array::new_null(0),
UInt32Array::new_null(0));
// Right side has an order and it is maintained during operation.
let right_side_ordered =
self.maintains_input_order()[1] &&
self.right.output_ordering().is_some();
- Ok(Box::pin(NestedLoopJoinStream {
- schema: Arc::clone(&self.schema),
- filter: self.filter.clone(),
- join_type: self.join_type,
- outer_table,
- inner_table,
- is_exhausted: false,
- column_indices: self.column_indices.clone(),
- join_metrics,
- indices_cache,
- right_side_ordered,
- }))
+
+ if enforce_batch_size_in_joins {
+ Ok(Box::pin(NestedLoopJoinStream {
+ schema: Arc::clone(&self.schema),
+ filter: self.filter.clone(),
+ join_type: self.join_type,
+ outer_table,
+ inner_table,
+ column_indices: self.column_indices.clone(),
+ join_metrics,
+ indices_cache,
+ right_side_ordered,
+ state: NestedLoopJoinStreamState::WaitBuildSide,
+ batch_transformer: BatchSplitter::new(batch_size),
Review Comment:
this is a clever idea to parameterize the joins stream on the transformer 👍
##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -502,29 +510,52 @@ impl ExecutionPlan for SymmetricHashJoinExec {
reservation.lock().try_grow(g.size())?;
}
- Ok(Box::pin(SymmetricHashJoinStream {
- left_stream,
- right_stream,
- schema: self.schema(),
- filter: self.filter.clone(),
- join_type: self.join_type,
- random_state: self.random_state.clone(),
- left: left_side_joiner,
- right: right_side_joiner,
- column_indices: self.column_indices.clone(),
- metrics: StreamJoinMetrics::new(partition, &self.metrics),
- graph,
- left_sorted_filter_expr,
- right_sorted_filter_expr,
- null_equals_null: self.null_equals_null,
- state: SHJStreamState::PullRight,
- reservation,
- }))
+ if enforce_batch_size_in_joins {
+ Ok(Box::pin(SymmetricHashJoinStream {
+ left_stream,
+ right_stream,
+ schema: self.schema(),
+ filter: self.filter.clone(),
+ join_type: self.join_type,
+ random_state: self.random_state.clone(),
+ left: left_side_joiner,
+ right: right_side_joiner,
+ column_indices: self.column_indices.clone(),
+ metrics: StreamJoinMetrics::new(partition, &self.metrics),
+ graph,
+ left_sorted_filter_expr,
+ right_sorted_filter_expr,
+ null_equals_null: self.null_equals_null,
+ state: SHJStreamState::PullRight,
+ reservation,
+ batch_transformer: BatchSplitter::new(batch_size),
Review Comment:
Given the ovehead of a BatchTransformer is likely small (one function call
per output batch). I suggest trying to use a `dyn` trait object here and in the
other joins instead (e.g. `batch_transformer: Box<dyn BatchTransformer>`)
I suspect it would not make any noticable performance difference
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]