jonathanc-n commented on code in PR #16450:
URL: https://github.com/apache/datafusion/pull/16450#discussion_r2155615159
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1009,95 +1012,99 @@ impl DefaultPhysicalPlanner {
let left_df_schema = left.schema();
let right_df_schema = right.schema();
let execution_props = session_state.execution_props();
- let join_on = keys
- .iter()
- .map(|(l, r)| {
- let l = create_physical_expr(l, left_df_schema,
execution_props)?;
- let r =
- create_physical_expr(r, right_df_schema,
execution_props)?;
- Ok((l, r))
- })
- .collect::<Result<join_utils::JoinOn>>()?;
-
- let join_filter = match filter {
- Some(expr) => {
- // Extract columns from filter expression and saved in
a HashSet
- let cols = expr.column_refs();
- // Collect left & right field indices, the field
indices are sorted in ascending order
- let left_field_indices = cols
- .iter()
- .filter_map(|c|
left_df_schema.index_of_column(c).ok())
- .sorted()
- .collect::<Vec<_>>();
- let right_field_indices = cols
- .iter()
- .filter_map(|c|
right_df_schema.index_of_column(c).ok())
- .sorted()
- .collect::<Vec<_>>();
+ // We declare a threshold here of 5 rows as NestedLoopJoins
tend to better when one
+ // of the tables are small.
+ let threshold = session_state
+ .config_options()
+ .optimizer
+ .nested_loop_equijoin_threshold;
+ let left_rows = *physical_left
+ // We set the partition to None here to draw the num_rows
from the plan
+ .partition_statistics(None)?
+ .num_rows
+ .get_value()
+ .unwrap()
+ <= threshold;
+ let right_rows = *physical_right
+ .partition_statistics(None)?
+ .num_rows
+ .get_value()
+ .unwrap()
+ <= threshold;
+ let use_nested_loop_join_equijoin = left_rows || right_rows;
+
+ // If we can use a nested loop join then `join_on` will be
empty because
+ // the expressions are moved into the join filter.
+ let join_on: JoinOn = if use_nested_loop_join_equijoin {
+ Vec::new()
+ } else {
+ keys.iter()
+ .map(|(l, r)| {
+ let l =
+ create_physical_expr(l, left_df_schema,
execution_props)?;
+ let r = create_physical_expr(
+ r,
+ right_df_schema,
+ execution_props,
+ )?;
+ Ok((l, r))
+ })
+ .collect::<Result<_>>()?
+ };
- // Collect DFFields and Fields required for
intermediate schemas
- let (filter_df_fields, filter_fields): (Vec<_>,
Vec<_>) =
- left_field_indices
- .clone()
- .into_iter()
- .map(|i| {
- (
- left_df_schema.qualified_field(i),
-
physical_left.schema().field(i).clone(),
- )
+ // If we can use nested loop join then we will combine the
expressions in `join_on`
+ // and pass it into the join filter; create your join filters
normally otherwise.
+ let join_filter: Option<JoinFilter> = if
use_nested_loop_join_equijoin {
+ let combined_join_on_expression: Expr = filter
Review Comment:
It seems that when we combine the `JoinOn` expressions here it will cause an
error when both sides in the expression have the same unqualified name leading
to duplicate unqualified fields. Is there a function that is able to qualify it
with the schema, I can't seem to find one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]