alamb commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905474273


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   this pass should probably be applied after FilterPushdown, so that as many 
predicates as possible are available



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {

Review Comment:
   why `iter_mut` and not `iter`?



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1375,3 +1375,351 @@ async fn hash_join_with_dictionary() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_left_join_1() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where 
t2.t2_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, 
#t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, 
t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, 
t2_int:UInt32;N]",
+        "      Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, 
t1_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      Filter: #t2.t2_id < Int64(100) [t2_id:UInt32;N, t2_name:Utf8;N, 
t2_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) 
[t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where 
t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, 
#t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, 
t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Filter: #t2.t2_int < Int64(10) OR #t1.t1_int > Int64(2) AND 
#t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, 
t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, 
t2_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) 
[t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_3() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce subquery to inner join
+    let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = 
t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where 
t3.t1_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, #t2.t2_id, 
#t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, 
t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Left Join: #t3.t1_int = #t2.t2_int [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, 
t2_int:UInt32;N]",
+        "      Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, alias=t3 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, alias=t3 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "          Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, 
t2_int:UInt32;N]",
+        "            Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, 
t1_name:Utf8;N, t1_int:UInt32;N]",
+        "              TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) 
[t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "            Filter: #t2.t2_int < Int64(3) AND #t2.t2_id < Int64(100) 
[t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "              TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) 
[t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) 
[t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 22    | b       | 2      |       |         |        |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}

Review Comment:
   some negative tests might be good -- like that 
   
   ```
       let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where 
t2.t2_id IS NULL";
   ```
   
   isn't rewritten to an inner join



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,

Review Comment:
   I wonder if it is a problem to use the same `nonnullable_cols` here as the 
call to `reduce_outer_join` could add new columns if a fitler is encountered 
somewhere down the left side (e.g. in a subquery). Maybe a `clone()` should be 
passed here (or the length remembered and truncated prior to calling 
`reduce_outer_join` on the right side)



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {
+            input,
+            expr,
+            schema,
+            alias: _,
+        }) => {
+            let projection = schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    // strip alias, as they should not be part of filters
+                    let expr = match &expr[i] {
+                        Expr::Alias(expr, _) => expr.as_ref().clone(),
+                        expr => expr.clone(),
+                    };
+
+                    (field.qualified_name(), expr)
+                })
+                .collect::<HashMap<_, _>>();
+
+            // re-write all Columns based on this projection
+            for col in nonnullable_cols.iter_mut() {
+                if let Some(Expr::Column(column)) = 
projection.get(&col.flat_name()) {
+                    *col = column.clone();
+                }
+            }
+
+            // optimize inner
+            let new_input = reduce_outer_join(
+                _optimizer,
+                input,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            from_plan(plan, expr, &[new_input])
+        }
+        _ => {
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    reduce_outer_join(
+                        _optimizer,
+                        plan,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from 
or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(

Review Comment:
   What do you think about using `Expr::nullable` instead?  That seems very 
similar / the same as what you are doing with this function (trying to 
determine if an expression can be nullable / non nullable)? 
   
   
https://github.com/apache/arrow-datafusion/blob/d985c0a3b6d96b02028f3abb6edb361ea72cac14/datafusion/expr/src/expr_schema.rs#L148-L211
   
   If `Expr::nullable` doesn't work correctly I think we should update that



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {

Review Comment:
   I think Sorts can also rewrite column names?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to