jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025978614
##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
}
}
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and
b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are
moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
impl OptimizerRule for ReduceCrossJoin {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
- let mut possible_join_keys: Vec<(Column, Column)> = vec![];
- let mut all_join_keys = HashSet::new();
+ match plan {
+ LogicalPlan::Filter(filter) => {
+ let input = (**filter.input()).clone();
+
+ let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+ let mut all_inputs: Vec<LogicalPlan> = vec![];
+ match &input {
+ LogicalPlan::Join(join) => {
+ if join.join_type != JoinType::Inner {
+ return utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ );
+ }
+ flatten_join_inputs(
+ &input,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ )?;
+ }
+ LogicalPlan::CrossJoin(_) => {
+ flatten_join_inputs(
+ &input,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ )?;
+ }
+ _ => {
+ return utils::optimize_children(self, plan,
_optimizer_config);
+ }
+ }
+
+ let predicate = filter.predicate();
+ // join keys are handled locally
+ let mut all_join_keys: HashSet<(Column, Column)> =
HashSet::new();
+
+ extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+ let mut left = all_inputs.remove(0);
+ while !all_inputs.is_empty() {
+ left = find_inner_join(
+ &left,
+ &mut all_inputs,
+ &mut possible_join_keys,
+ &mut all_join_keys,
+ )?;
+ }
- reduce_cross_join(self, plan, &mut possible_join_keys, &mut
all_join_keys)
+ left = utils::optimize_children(self, &left,
_optimizer_config)?;
+ if plan.schema() != left.schema() {
+ left = LogicalPlan::Projection(Projection::new_from_schema(
+ Arc::new(left.clone()),
+ plan.schema().clone(),
+ None,
+ ));
+ }
+
+ // if there are no join keys then do nothing.
+ if all_join_keys.is_empty() {
+ Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate.clone(),
+ Arc::new(left),
+ )?))
+ } else {
+ // remove join expressions from filter
+ match remove_join_expressions(predicate, &all_join_keys)? {
+ Some(filter_expr) =>
Ok(LogicalPlan::Filter(Filter::try_new(
+ filter_expr,
+ Arc::new(left),
+ )?)),
+ _ => Ok(left),
+ }
+ }
+ }
+
+ _ => utils::optimize_children(self, plan, _optimizer_config),
+ }
}
fn name(&self) -> &str {
"reduce_cross_join"
}
}
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and
b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are
moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
- _optimizer: &ReduceCrossJoin,
+fn flatten_join_inputs(
plan: &LogicalPlan,
possible_join_keys: &mut Vec<(Column, Column)>,
- all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Filter(filter) => {
- let input = filter.input();
- let predicate = filter.predicate();
- // join keys are handled locally
- let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
- let mut new_all_join_keys = HashSet::new();
-
- extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
- let new_plan = reduce_cross_join(
- _optimizer,
- input,
- &mut new_possible_join_keys,
- &mut new_all_join_keys,
- )?;
-
- // if there are no join keys then do nothing.
- if new_all_join_keys.is_empty() {
- Ok(LogicalPlan::Filter(Filter::try_new(
- predicate.clone(),
- Arc::new(new_plan),
- )?))
- } else {
- // remove join expressions from filter
- match remove_join_expressions(predicate, &new_all_join_keys)? {
- Some(filter_expr) =>
Ok(LogicalPlan::Filter(Filter::try_new(
- filter_expr,
- Arc::new(new_plan),
- )?)),
- _ => Ok(new_plan),
- }
+ all_inputs: &mut Vec<LogicalPlan>,
+) -> Result<()> {
+ let children = match plan {
+ LogicalPlan::Join(join) => {
+ for join_keys in join.on.iter() {
+ possible_join_keys.push(join_keys.clone());
}
+ let left = &*(join.left);
+ let right = &*(join.right);
+ Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
}
- LogicalPlan::CrossJoin(cross_join) => {
- let left_plan = reduce_cross_join(
- _optimizer,
- &cross_join.left,
- possible_join_keys,
- all_join_keys,
- )?;
- let right_plan = reduce_cross_join(
- _optimizer,
- &cross_join.right,
- possible_join_keys,
- all_join_keys,
- )?;
- // can we find a match?
- let left_schema = left_plan.schema();
- let right_schema = right_plan.schema();
- let mut join_keys = vec![];
-
- for (l, r) in possible_join_keys {
- if left_schema.field_from_column(l).is_ok()
- && right_schema.field_from_column(r).is_ok()
- &&
can_hash(left_schema.field_from_column(l).unwrap().data_type())
- {
- join_keys.push((l.clone(), r.clone()));
- } else if left_schema.field_from_column(r).is_ok()
- && right_schema.field_from_column(l).is_ok()
- &&
can_hash(left_schema.field_from_column(r).unwrap().data_type())
- {
- join_keys.push((r.clone(), l.clone()));
+ LogicalPlan::CrossJoin(join) => {
+ let left = &*(join.left);
+ let right = &*(join.right);
+ Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
+ }
+ _ => {
+ return Err(DataFusionError::Plan(
+ "flatten_join_inputs just can call
join/cross_join".to_string(),
+ ));
+ }
+ }?;
+
+ for child in children.iter() {
+ match *child {
+ LogicalPlan::Join(left_join) => {
+ if left_join.join_type == JoinType::Inner {
+ flatten_join_inputs(child, possible_join_keys,
all_inputs)?;
+ } else {
+ all_inputs.push((*child).clone());
}
}
+ LogicalPlan::CrossJoin(_) => {
+ flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+ }
+ _ => all_inputs.push((*child).clone()),
+ }
+ }
+ Ok(())
+}
Review Comment:
Yes, I tried it past, but it's complex, I can do it in followup job
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]