AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905638161


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {
+            input,
+            expr,
+            schema,
+            alias: _,
+        }) => {
+            let projection = schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    // strip alias, as they should not be part of filters
+                    let expr = match &expr[i] {
+                        Expr::Alias(expr, _) => expr.as_ref().clone(),
+                        expr => expr.clone(),
+                    };
+
+                    (field.qualified_name(), expr)
+                })
+                .collect::<HashMap<_, _>>();
+
+            // re-write all Columns based on this projection
+            for col in nonnullable_cols.iter_mut() {
+                if let Some(Expr::Column(column)) = 
projection.get(&col.flat_name()) {
+                    *col = column.clone();
+                }
+            }
+
+            // optimize inner
+            let new_input = reduce_outer_join(
+                _optimizer,
+                input,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            from_plan(plan, expr, &[new_input])
+        }
+        _ => {
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    reduce_outer_join(
+                        _optimizer,
+                        plan,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from 
or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(

Review Comment:
   The extract_nonnullable_columns gets all columns of these exprs which 
returns false if inputs are null, and we use these columns to see if they 
appear in null rows of output for outer join.  If so, this means these null 
rows does not meet the conditions, we can filter null rows, reduce outer to 
inner. So I think the purpose of extract_nonnullable_columns is not the same as 
Expr::nullable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to