mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020916128
##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,235 @@ impl ReduceCrossJoin {
}
}
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and
b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are
moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
impl OptimizerRule for ReduceCrossJoin {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
- let mut possible_join_keys: Vec<(Column, Column)> = vec![];
- let mut all_join_keys = HashSet::new();
+ match plan {
+ LogicalPlan::Filter(filter) => {
+ let mut input = (**filter.input()).clone();
+
+ // optimize children.
+ input = self.optimize(&input, _optimizer_config)?;
+
+ let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+ let mut all_inputs: Vec<LogicalPlan> = vec![];
+ match &input {
+ LogicalPlan::Join(join) => {
+ if join.join_type != JoinType::Inner {
+ return utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ );
+ }
+ collect_all_inputs_from_inner(
+ join,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ );
+ }
+ LogicalPlan::CrossJoin(join) => {
+ collect_all_inputs_from_cross(
+ join,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ );
+ }
+ _ => {
+ let new_exprs = plan.expressions();
+ let new_inputs = [input];
+ return from_plan(plan, &new_exprs, &new_inputs);
+ }
+ }
+
+ let predicate = filter.predicate();
+ // join keys are handled locally
+ let mut all_join_keys: HashSet<(Column, Column)> =
HashSet::new();
- reduce_cross_join(self, plan, &mut possible_join_keys, &mut
all_join_keys)
+ extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+ let mut left = all_inputs.remove(0);
+ while !all_inputs.is_empty() {
+ left = find_inner_join(
+ &left,
+ &mut all_inputs,
+ &mut possible_join_keys,
+ &mut all_join_keys,
+ )?;
+ }
+
+ // if there are no join keys then do nothing.
+ if all_join_keys.is_empty() {
+ Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate.clone(),
+ Arc::new(left),
+ )?))
+ } else {
+ // remove join expressions from filter
+ match remove_join_expressions(predicate, &all_join_keys)? {
+ Some(filter_expr) =>
Ok(LogicalPlan::Filter(Filter::try_new(
+ filter_expr,
+ Arc::new(left),
+ )?)),
+ _ => Ok(left),
+ }
+ }
+ }
+
+ _ => utils::optimize_children(self, plan, _optimizer_config),
+ }
}
fn name(&self) -> &str {
"reduce_cross_join"
}
}
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and
b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are
moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
- _optimizer: &ReduceCrossJoin,
- plan: &LogicalPlan,
- possible_join_keys: &mut Vec<(Column, Column)>,
- all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Filter(filter) => {
- let input = filter.input();
- let predicate = filter.predicate();
- // join keys are handled locally
- let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
- let mut new_all_join_keys = HashSet::new();
-
- extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
- let new_plan = reduce_cross_join(
- _optimizer,
- input,
- &mut new_possible_join_keys,
- &mut new_all_join_keys,
- )?;
-
- // if there are no join keys then do nothing.
- if new_all_join_keys.is_empty() {
- Ok(LogicalPlan::Filter(Filter::try_new(
- predicate.clone(),
- Arc::new(new_plan),
- )?))
+fn collect_all_inputs_from_inner(
+ join: &Join,
+ all_join_keys: &mut Vec<(Column, Column)>,
Review Comment:
It seems `collect_all_inputs_from_inner()` and
`collect_all_inputs_from_cross() ` have similar structure, can we combine the
two methods to one ?
Maybe we can name that `flatten_join_inputs()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]