This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3af09fbf2 Add `Filter::try_new` with validation (#3796)
3af09fbf2 is described below

commit 3af09fbf2026fc079264b1f67bc7095dc7fe7161
Author: Andy Grove <[email protected]>
AuthorDate: Wed Oct 12 09:54:59 2022 -0600

    Add `Filter::try_new` with validation (#3796)
    
    * Optimizer example and docs
    
    * Add Filter::try_new
    
    * Revert some changes
    
    * validate that filter predicates return boolean
    
    * update some invalid tests
    
    * tests pass
    
    * make Filter attributes private
    
    * update docs
    
    * Cargo.lock
    
    * clippy
---
 benchmarks/expected-plans/q6.txt                   |  2 +-
 datafusion-cli/Cargo.lock                          |  1 +
 datafusion/core/src/physical_plan/planner.rs       | 27 +++++++-----
 datafusion/core/tests/sql/select.rs                | 11 +++--
 datafusion/expr/Cargo.toml                         |  1 +
 datafusion/expr/src/expr.rs                        |  8 ++++
 datafusion/expr/src/logical_plan/builder.rs        |  8 ++--
 datafusion/expr/src/logical_plan/plan.rs           | 49 +++++++++++++++++++--
 datafusion/expr/src/utils.rs                       | 50 ++++++++++++++++++++--
 .../optimizer/src/common_subexpr_eliminate.rs      | 12 +++---
 .../optimizer/src/decorrelate_where_exists.rs      | 25 +++++------
 datafusion/optimizer/src/decorrelate_where_in.rs   | 24 +++++------
 datafusion/optimizer/src/eliminate_filter.rs       | 29 ++++++++-----
 datafusion/optimizer/src/filter_null_join_keys.rs  | 12 +++---
 datafusion/optimizer/src/filter_push_down.rs       | 14 +++---
 datafusion/optimizer/src/projection_push_down.rs   |  4 +-
 datafusion/optimizer/src/reduce_cross_join.rs      | 20 +++++----
 datafusion/optimizer/src/reduce_outer_join.rs      | 24 +++++------
 .../optimizer/src/rewrite_disjunctive_predicate.rs | 12 +++---
 .../optimizer/src/scalar_subquery_to_join.rs       | 28 ++++++------
 datafusion/optimizer/src/simplify_expressions.rs   | 48 ++++++++++-----------
 .../optimizer/src/subquery_filter_to_join.rs       | 24 +++++------
 datafusion/optimizer/src/utils.rs                  |  8 ++--
 datafusion/proto/src/logical_plan.rs               | 11 +++--
 datafusion/sql/src/planner.rs                      |  8 ++--
 25 files changed, 288 insertions(+), 172 deletions(-)

diff --git a/benchmarks/expected-plans/q6.txt b/benchmarks/expected-plans/q6.txt
index ad27ba2b9..55a6174ab 100644
--- a/benchmarks/expected-plans/q6.txt
+++ b/benchmarks/expected-plans/q6.txt
@@ -1,5 +1,5 @@
 Projection: SUM(lineitem.l_extendedprice * lineitem.l_discount) AS revenue
   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice * 
lineitem.l_discount)]]
     Projection: CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
CAST(lineitem.l_discount AS Decimal128(30, 15))lineitem.l_discount, 
lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_shipdate
-      Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < 
Date32("9131") AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount >= Decimal128(Some(49999999999999),30,15) AND 
CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= 
Decimal128(Some(69999999999999),30,15) AND lineitem.l_quantity < 
Decimal128(Some(2400),15,2)
+      Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < 
Date32("9131") AND CAST(lineitem.l_discount AS Decimal128(30, 15)) >= 
Decimal128(Some(49999999999999),30,15) AND CAST(lineitem.l_discount AS 
Decimal128(30, 15)) <= Decimal128(Some(69999999999999),30,15) AND 
lineitem.l_quantity < Decimal128(Some(2400),15,2)
         TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_shipdate]
\ No newline at end of file
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 7ced78299..29ddffd6c 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -617,6 +617,7 @@ dependencies = [
  "ahash 0.8.0",
  "arrow",
  "datafusion-common",
+ "log",
  "sqlparser",
 ]
 
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 8bb1d95a4..88be00cd9 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -28,8 +28,8 @@ use crate::datasource::source_as_provider;
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
 use crate::logical_expr::{
-    Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, 
SubqueryAlias,
-    TableScan, Window,
+    Aggregate, Distinct, EmptyRelation, Join, Projection, Sort, SubqueryAlias, 
TableScan,
+    Window,
 };
 use crate::logical_plan::{
     unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
@@ -756,15 +756,13 @@ impl DefaultPhysicalPlanner {
                         input_exec,
                     )?) )
                 }
-                LogicalPlan::Filter(Filter {
-                    input, predicate, ..
-                }) => {
-                    let physical_input = self.create_initial_plan(input, 
session_state).await?;
+                LogicalPlan::Filter(filter) => {
+                    let physical_input = 
self.create_initial_plan(filter.input(), session_state).await?;
                     let input_schema = physical_input.as_ref().schema();
-                    let input_dfschema = input.as_ref().schema();
+                    let input_dfschema = filter.input().schema();
 
                     let runtime_expr = self.create_physical_expr(
-                        predicate,
+                        filter.predicate(),
                         input_dfschema,
                         &input_schema,
                         session_state,
@@ -1696,8 +1694,7 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use datafusion_common::{DFField, DFSchema, DFSchemaRef};
     use datafusion_expr::expr::GroupingSet;
-    use datafusion_expr::sum;
-    use datafusion_expr::{col, lit};
+    use datafusion_expr::{col, lit, sum};
     use fmt::Debug;
     use std::collections::HashMap;
     use std::convert::TryFrom;
@@ -1705,7 +1702,10 @@ mod tests {
 
     fn make_session_state() -> SessionState {
         let runtime = Arc::new(RuntimeEnv::default());
-        SessionState::with_config_rt(SessionConfig::new(), runtime)
+        let config = SessionConfig::new();
+        // TODO we should really test that no optimizer rules are failing here
+        // let config = 
config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false);
+        SessionState::with_config_rt(config, runtime)
     }
 
     async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn 
ExecutionPlan>> {
@@ -1972,6 +1972,11 @@ mod tests {
         let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", 
index: 0 }, list: [Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"1\") 
}], negated: false, set: None }";
         assert!(format!("{:?}", execution_plan).contains(expected));
 
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn in_list_types_struct_literal() -> Result<()> {
         // expression: "a in (struct::null, 'a')"
         let list = vec![struct_literal(), lit("a")];
 
diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index 1ea530bfb..e111b21ad 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -1211,14 +1211,19 @@ async fn boolean_literal() -> Result<()> {
 
 #[tokio::test]
 async fn unprojected_filter() {
-    let ctx = SessionContext::new();
+    let config = SessionConfig::new();
+    let ctx = SessionContext::with_config(config);
     let df = ctx.read_table(table_with_sequence(1, 3).unwrap()).unwrap();
 
     let df = df
-        .select(vec![col("i") + col("i")])
-        .unwrap()
         .filter(col("i").gt(lit(2)))
+        .unwrap()
+        .select(vec![col("i") + col("i")])
         .unwrap();
+
+    let plan = df.to_logical_plan().unwrap();
+    println!("{}", plan.display_indent());
+
     let results = df.collect().await.unwrap();
 
     let expected = vec![
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index 3280628a4..5d7350f72 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -38,4 +38,5 @@ path = "src/lib.rs"
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
 arrow = { version = "24.0.0", default-features = false }
 datafusion-common = { path = "../common", version = "13.0.0" }
+log = "^0.4"
 sqlparser = "0.25"
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 16c3da078..c21dd5c88 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -486,6 +486,14 @@ impl Expr {
         Expr::Alias(Box::new(self), name.into())
     }
 
+    /// Remove an alias from an expression if one exists.
+    pub fn unalias(self) -> Expr {
+        match self {
+            Expr::Alias(expr, _) => expr.as_ref().clone(),
+            _ => self,
+        }
+    }
+
     /// Return `self IN <list>` if `negated` is false, otherwise
     /// return `self NOT IN <list>`.a
     pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 522639984..300c3b8cb 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -289,10 +289,10 @@ impl LogicalPlanBuilder {
     /// Apply a filter
     pub fn filter(&self, expr: impl Into<Expr>) -> Result<Self> {
         let expr = normalize_col(expr.into(), &self.plan)?;
-        Ok(Self::from(LogicalPlan::Filter(Filter {
-            predicate: expr,
-            input: Arc::new(self.plan.clone()),
-        })))
+        Ok(Self::from(LogicalPlan::Filter(Filter::try_new(
+            expr,
+            Arc::new(self.plan.clone()),
+        )?)))
     }
 
     /// Limit the number of rows returned
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 738e842a2..b21076a9d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -15,17 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+///! Logical plan types
 use crate::logical_plan::builder::validate_unique_names;
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
 use crate::utils::{
     exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist,
 };
-use crate::{Expr, TableProviderFilterPushDown, TableSource};
+use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, 
DataFusionError};
 use std::collections::HashSet;
-///! Logical plan types
 use std::fmt::{self, Debug, Display, Formatter};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
@@ -1148,18 +1148,59 @@ pub struct SubqueryAlias {
 #[derive(Clone)]
 pub struct Filter {
     /// The predicate expression, which must have Boolean type.
-    pub predicate: Expr,
+    predicate: Expr,
     /// The incoming logical plan
-    pub input: Arc<LogicalPlan>,
+    input: Arc<LogicalPlan>,
 }
 
 impl Filter {
+    /// Create a new filter operator.
+    pub fn try_new(
+        predicate: Expr,
+        input: Arc<LogicalPlan>,
+    ) -> datafusion_common::Result<Self> {
+        // Filter predicates must return a boolean value so we try and 
validate that here.
+        // Note that it is not always possible to resolve the predicate 
expression during plan
+        // construction (such as with correlated subqueries) so we make a best 
effort here and
+        // ignore errors resolving the expression against the schema.
+        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
+            if predicate_type != DataType::Boolean {
+                return Err(DataFusionError::Plan(format!(
+                    "Cannot create filter with non-boolean predicate '{}' 
returning {}",
+                    predicate, predicate_type
+                )));
+            }
+        }
+
+        // filter predicates should not be aliased
+        if let Expr::Alias(expr, alias) = predicate {
+            return Err(DataFusionError::Plan(format!(
+                "Attempted to create Filter predicate with \
+                expression `{}` aliased as '{}'. Filter predicates should not 
be \
+                aliased.",
+                expr, alias
+            )));
+        }
+
+        Ok(Self { predicate, input })
+    }
+
     pub fn try_from_plan(plan: &LogicalPlan) -> 
datafusion_common::Result<&Filter> {
         match plan {
             LogicalPlan::Filter(it) => Ok(it),
             _ => plan_err!("Could not coerce into Filter!"),
         }
     }
+
+    /// Access the filter predicate expression
+    pub fn predicate(&self) -> &Expr {
+        &self.predicate
+    }
+
+    /// Access the filter input plan
+    pub fn input(&self) -> &Arc<LogicalPlan> {
+        &self.input
+    }
 }
 
 /// Window its input based on a set of window spec and window function (e.g. 
SUM or RANK)
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 501b4a8f1..8e2544793 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,6 +17,7 @@
 
 //! Expression utilities
 
+use crate::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
 use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 use crate::logical_plan::builder::build_join_schema;
 use crate::logical_plan::{
@@ -380,10 +381,51 @@ pub fn from_plan(
                 .map(|s| s.to_vec())
                 .collect::<Vec<_>>(),
         })),
-        LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
-            predicate: expr[0].clone(),
-            input: Arc::new(inputs[0].clone()),
-        })),
+        LogicalPlan::Filter { .. } => {
+            assert_eq!(1, expr.len());
+            let predicate = expr[0].clone();
+
+            // filter predicates should not contain aliased expressions so we 
remove any aliases
+            // before this logic was added we would have aliases within 
filters such as for
+            // benchmark q6:
+            //
+            // lineitem.l_shipdate >= Date32(\"8766\")
+            // AND lineitem.l_shipdate < Date32(\"9131\")
+            // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount >=
+            // Decimal128(Some(49999999999999),30,15)
+            // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount <=
+            // Decimal128(Some(69999999999999),30,15)
+            // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
+
+            struct RemoveAliases {}
+
+            impl ExprRewriter for RemoveAliases {
+                fn pre_visit(&mut self, expr: &Expr) -> 
Result<RewriteRecursion> {
+                    match expr {
+                        Expr::Exists { .. }
+                        | Expr::ScalarSubquery(_)
+                        | Expr::InSubquery { .. } => {
+                            // subqueries could contain aliases so we don't 
recurse into those
+                            Ok(RewriteRecursion::Stop)
+                        }
+                        Expr::Alias(_, _) => Ok(RewriteRecursion::Mutate),
+                        _ => Ok(RewriteRecursion::Continue),
+                    }
+                }
+
+                fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+                    Ok(expr.unalias())
+                }
+            }
+
+            let mut remove_aliases = RemoveAliases {};
+            let predicate = predicate.rewrite(&mut remove_aliases)?;
+
+            Ok(LogicalPlan::Filter(Filter::try_new(
+                predicate,
+                Arc::new(inputs[0].clone()),
+            )?))
+        }
         LogicalPlan::Repartition(Repartition {
             partitioning_scheme,
             ..
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index db3d2ca8d..d33fe07c0 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -112,7 +112,9 @@ fn optimize(
                 alias.clone(),
             )?))
         }
-        LogicalPlan::Filter(Filter { predicate, input }) => {
+        LogicalPlan::Filter(filter) => {
+            let input = filter.input();
+            let predicate = filter.predicate();
             let input_schema = Arc::clone(input.schema());
             let mut id_array = vec![];
             expr_to_identifier(predicate, &mut expr_set, &mut id_array, 
input_schema)?;
@@ -120,16 +122,16 @@ fn optimize(
             let (mut new_expr, new_input) = rewrite_expr(
                 &[&[predicate.clone()]],
                 &[&[id_array]],
-                input,
+                filter.input(),
                 &mut expr_set,
                 optimizer_config,
             )?;
 
             if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
-                Ok(LogicalPlan::Filter(Filter {
+                Ok(LogicalPlan::Filter(Filter::try_new(
                     predicate,
-                    input: Arc::new(new_input),
-                }))
+                    Arc::new(new_input),
+                )?))
             } else {
                 Err(DataFusionError::Internal(
                     "Failed to pop predicate expr".to_string(),
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs 
b/datafusion/optimizer/src/decorrelate_where_exists.rs
index 671deb027..d6727ad0f 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -76,19 +76,19 @@ impl OptimizerRule for DecorrelateWhereExists {
         optimizer_config: &mut OptimizerConfig,
     ) -> datafusion_common::Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Filter(Filter {
-                predicate,
-                input: filter_input,
-            }) => {
+            LogicalPlan::Filter(filter) => {
+                let predicate = filter.predicate();
+                let filter_input = filter.input();
+
                 // Apply optimizer rule to current input
                 let optimized_input = self.optimize(filter_input, 
optimizer_config)?;
 
                 let (subqueries, other_exprs) =
                     self.extract_subquery_exprs(predicate, optimizer_config)?;
-                let optimized_plan = LogicalPlan::Filter(Filter {
-                    predicate: predicate.clone(),
-                    input: Arc::new(optimized_input),
-                });
+                let optimized_plan = LogicalPlan::Filter(Filter::try_new(
+                    predicate.clone(),
+                    Arc::new(optimized_input),
+                )?);
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
                     return Ok(optimized_plan);
@@ -153,20 +153,21 @@ fn optimize_exists(
 
     // split into filters
     let mut subqry_filter_exprs = vec![];
-    split_conjunction(&subqry_filter.predicate, &mut subqry_filter_exprs);
+    split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
     verify_not_disjunction(&subqry_filter_exprs)?;
 
     // Grab column names to join on
     let (col_exprs, other_subqry_exprs) =
-        find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?;
+        find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema())?;
     let (outer_cols, subqry_cols, join_filters) =
-        exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?;
+        exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
     if subqry_cols.is_empty() || outer_cols.is_empty() {
         plan_err!("cannot optimize non-correlated subquery")?;
     }
 
     // build subquery side of join - the thing the subquery was querying
-    let mut subqry_plan = 
LogicalPlanBuilder::from((*subqry_filter.input).clone());
+    let mut subqry_plan =
+        LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone());
     if let Some(expr) = combine_filters(&other_subqry_exprs) {
         subqry_plan = subqry_plan.filter(expr)? // if the subquery had 
additional expressions, restore them
     }
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs 
b/datafusion/optimizer/src/decorrelate_where_in.rs
index d5af0911d..a3443eaee 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -83,19 +83,19 @@ impl OptimizerRule for DecorrelateWhereIn {
         optimizer_config: &mut OptimizerConfig,
     ) -> datafusion_common::Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Filter(Filter {
-                predicate,
-                input: filter_input,
-            }) => {
+            LogicalPlan::Filter(filter) => {
+                let predicate = filter.predicate();
+                let filter_input = filter.input();
+
                 // Apply optimizer rule to current input
                 let optimized_input = self.optimize(filter_input, 
optimizer_config)?;
 
                 let (subqueries, other_exprs) =
                     self.extract_subquery_exprs(predicate, optimizer_config)?;
-                let optimized_plan = LogicalPlan::Filter(Filter {
-                    predicate: predicate.clone(),
-                    input: Arc::new(optimized_input),
-                });
+                let optimized_plan = LogicalPlan::Filter(Filter::try_new(
+                    predicate.clone(),
+                    Arc::new(optimized_input),
+                )?);
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
                     return Ok(optimized_plan);
@@ -152,18 +152,18 @@ fn optimize_where_in(
     if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
         // split into filters
         let mut subqry_filter_exprs = vec![];
-        split_conjunction(&subqry_filter.predicate, &mut subqry_filter_exprs);
+        split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
         verify_not_disjunction(&subqry_filter_exprs)?;
 
         // Grab column names to join on
         let (col_exprs, other_exprs) =
-            find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())
+            find_join_exprs(subqry_filter_exprs, 
subqry_filter.input().schema())
                 .map_err(|e| context!("column correlation not found", e))?;
         if !col_exprs.is_empty() {
             // it's correlated
-            subqry_input = subqry_filter.input.clone();
+            subqry_input = subqry_filter.input().clone();
             (outer_cols, subqry_cols, join_filters) =
-                exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), 
false)
+                exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), 
false)
                     .map_err(|e| context!("column correlation not found", e))?;
             other_subqry_exprs = other_exprs;
         }
diff --git a/datafusion/optimizer/src/eliminate_filter.rs 
b/datafusion/optimizer/src/eliminate_filter.rs
index 61e9613cf..6c0c51b86 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -21,7 +21,7 @@
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{
-    logical_plan::{EmptyRelation, Filter, LogicalPlan},
+    logical_plan::{EmptyRelation, LogicalPlan},
     utils::from_plan,
     Expr,
 };
@@ -43,21 +43,30 @@ impl OptimizerRule for EliminateFilter {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::Filter(Filter {
-                predicate: Expr::Literal(ScalarValue::Boolean(Some(v))),
-                input,
-            }) => {
-                if !*v {
+        let (filter_value, input) = match plan {
+            LogicalPlan::Filter(filter) => match filter.predicate() {
+                Expr::Literal(ScalarValue::Boolean(Some(v))) => {
+                    (Some(*v), Some(filter.input()))
+                }
+                _ => (None, None),
+            },
+            _ => (None, None),
+        };
+
+        match filter_value {
+            Some(v) => {
+                // input is guaranteed be Some due to previous code
+                let input = input.unwrap();
+                if v {
+                    self.optimize(input, optimizer_config)
+                } else {
                     Ok(LogicalPlan::EmptyRelation(EmptyRelation {
                         produce_one_row: false,
                         schema: input.schema().clone(),
                     }))
-                } else {
-                    self.optimize(input, optimizer_config)
                 }
             }
-            _ => {
+            None => {
                 // Apply the optimization to all inputs of the plan
                 let inputs = plan.inputs();
                 let new_inputs = inputs
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs 
b/datafusion/optimizer/src/filter_null_join_keys.rs
index 4d237dd04..be33c796e 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -68,17 +68,17 @@ impl OptimizerRule for FilterNullJoinKeys {
 
                 if !left_filters.is_empty() {
                     let predicate = create_not_null_predicate(left_filters);
-                    join.left = Arc::new(LogicalPlan::Filter(Filter {
+                    join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(
                         predicate,
-                        input: join.left.clone(),
-                    }));
+                        join.left.clone(),
+                    )?));
                 }
                 if !right_filters.is_empty() {
                     let predicate = create_not_null_predicate(right_filters);
-                    join.right = Arc::new(LogicalPlan::Filter(Filter {
+                    join.right = Arc::new(LogicalPlan::Filter(Filter::try_new(
                         predicate,
-                        input: join.right.clone(),
-                    }));
+                        join.right.clone(),
+                    )?));
                 }
                 Ok(LogicalPlan::Join(join))
             }
diff --git a/datafusion/optimizer/src/filter_push_down.rs 
b/datafusion/optimizer/src/filter_push_down.rs
index 129766012..d1f696621 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -20,8 +20,8 @@ use datafusion_expr::{
     col,
     expr_rewriter::{replace_col, ExprRewritable, ExprRewriter},
     logical_plan::{
-        Aggregate, CrossJoin, Filter, Join, JoinType, Limit, LogicalPlan, 
Projection,
-        TableScan, Union,
+        Aggregate, CrossJoin, Join, JoinType, Limit, LogicalPlan, Projection, 
TableScan,
+        Union,
     },
     utils::{expr_to_columns, exprlist_to_columns, from_plan},
     Expr, TableProviderFilterPushDown,
@@ -138,7 +138,7 @@ fn issue_filters(
         return push_down(&state, plan);
     }
 
-    let plan = utils::add_filter(plan.clone(), &predicates);
+    let plan = utils::add_filter(plan.clone(), &predicates)?;
 
     state.filters = remove_filters(&state.filters, &predicate_columns);
 
@@ -326,7 +326,7 @@ fn optimize_join(
         Ok(plan)
     } else {
         // wrap the join on the filter whose predicates must be kept
-        let plan = utils::add_filter(plan, &to_keep.0);
+        let plan = utils::add_filter(plan, &to_keep.0)?;
         state.filters = remove_filters(&state.filters, &to_keep.1);
 
         Ok(plan)
@@ -340,9 +340,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             push_down(&state, plan)
         }
         LogicalPlan::Analyze { .. } => push_down(&state, plan),
-        LogicalPlan::Filter(Filter { input, predicate }) => {
+        LogicalPlan::Filter(filter) => {
             let mut predicates = vec![];
-            utils::split_conjunction(predicate, &mut predicates);
+            utils::split_conjunction(filter.predicate(), &mut predicates);
 
             predicates
                 .into_iter()
@@ -353,7 +353,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
                     Ok(())
                 })?;
 
-            optimize(input, state)
+            optimize(filter.input(), state)
         }
         LogicalPlan::Projection(Projection {
             input,
diff --git a/datafusion/optimizer/src/projection_push_down.rs 
b/datafusion/optimizer/src/projection_push_down.rs
index 051a0ed74..5a048aac7 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -580,12 +580,12 @@ mod tests {
         let table_scan = test_table_scan()?;
 
         let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(col("c"))?
+            .filter(col("c").gt(lit(1)))?
             .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
             .build()?;
 
         let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\
-        \n  Filter: test.c\
+        \n  Filter: test.c > Int32(1)\
         \n    TableScan: test projection=[b, c]";
 
         assert_optimized_plan_eq(&plan, expected);
diff --git a/datafusion/optimizer/src/reduce_cross_join.rs 
b/datafusion/optimizer/src/reduce_cross_join.rs
index 4c43188cf..e8c2ff9ec 100644
--- a/datafusion/optimizer/src/reduce_cross_join.rs
+++ b/datafusion/optimizer/src/reduce_cross_join.rs
@@ -77,7 +77,9 @@ fn reduce_cross_join(
     all_join_keys: &mut HashSet<(Column, Column)>,
 ) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Filter(Filter { input, predicate }) => {
+        LogicalPlan::Filter(filter) => {
+            let input = filter.input();
+            let predicate = filter.predicate();
             // join keys are handled locally
             let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
             let mut new_all_join_keys = HashSet::new();
@@ -93,17 +95,17 @@ fn reduce_cross_join(
 
             // if there are no join keys then do nothing.
             if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter {
-                    predicate: predicate.clone(),
-                    input: Arc::new(new_plan),
-                }))
+                Ok(LogicalPlan::Filter(Filter::try_new(
+                    predicate.clone(),
+                    Arc::new(new_plan),
+                )?))
             } else {
                 // remove join expressions from filter
                 match remove_join_expressions(predicate, &new_all_join_keys)? {
-                    Some(filter_expr) => Ok(LogicalPlan::Filter(Filter {
-                        predicate: filter_expr,
-                        input: Arc::new(new_plan),
-                    })),
+                    Some(filter_expr) => 
Ok(LogicalPlan::Filter(Filter::try_new(
+                        filter_expr,
+                        Arc::new(new_plan),
+                    )?)),
                     _ => Ok(new_plan),
                 }
             }
diff --git a/datafusion/optimizer/src/reduce_outer_join.rs 
b/datafusion/optimizer/src/reduce_outer_join.rs
index 6ca4a5994..93b706afe 100644
--- a/datafusion/optimizer/src/reduce_outer_join.rs
+++ b/datafusion/optimizer/src/reduce_outer_join.rs
@@ -69,34 +69,34 @@ fn reduce_outer_join(
     _optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+        LogicalPlan::Filter(filter) => match filter.input().as_ref() {
             LogicalPlan::Join(join) => {
                 extract_nonnullable_columns(
-                    predicate,
+                    filter.predicate(),
                     nonnullable_cols,
                     join.left.schema(),
                     join.right.schema(),
                     true,
                 )?;
-                Ok(LogicalPlan::Filter(Filter {
-                    predicate: predicate.clone(),
-                    input: Arc::new(reduce_outer_join(
+                Ok(LogicalPlan::Filter(Filter::try_new(
+                    filter.predicate().clone(),
+                    Arc::new(reduce_outer_join(
                         _optimizer,
-                        input,
+                        filter.input(),
                         nonnullable_cols,
                         _optimizer_config,
                     )?),
-                }))
+                )?))
             }
-            _ => Ok(LogicalPlan::Filter(Filter {
-                predicate: predicate.clone(),
-                input: Arc::new(reduce_outer_join(
+            _ => Ok(LogicalPlan::Filter(Filter::try_new(
+                filter.predicate().clone(),
+                Arc::new(reduce_outer_join(
                     _optimizer,
-                    input,
+                    filter.input(),
                     nonnullable_cols,
                     _optimizer_config,
                 )?),
-            })),
+            )?)),
         },
         LogicalPlan::Join(join) => {
             let mut new_join_type = join.join_type;
diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs 
b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
index 2eadfb3d5..a4f051e1c 100644
--- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
+++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
@@ -129,16 +129,16 @@ impl RewriteDisjunctivePredicate {
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Filter(filter) => {
-                let predicate = predicate(&filter.predicate)?;
+                let predicate = predicate(filter.predicate())?;
                 let rewritten_predicate = rewrite_predicate(predicate);
                 let rewritten_expr = normalize_predicate(rewritten_predicate);
-                Ok(LogicalPlan::Filter(Filter {
-                    predicate: rewritten_expr,
-                    input: Arc::new(self.rewrite_disjunctive_predicate(
-                        &filter.input,
+                Ok(LogicalPlan::Filter(Filter::try_new(
+                    rewritten_expr,
+                    Arc::new(self.rewrite_disjunctive_predicate(
+                        filter.input(),
                         _optimizer_config,
                     )?),
-                }))
+                )?))
             }
             _ => {
                 let expr = plan.expressions();
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 0a2256eba..d14888110 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -95,23 +95,23 @@ impl OptimizerRule for ScalarSubqueryToJoin {
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Filter(Filter { predicate, input }) => {
+            LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
-                let optimized_input = self.optimize(input, optimizer_config)?;
+                let optimized_input = self.optimize(filter.input(), 
optimizer_config)?;
 
                 let (subqueries, other_exprs) =
-                    self.extract_subquery_exprs(predicate, optimizer_config)?;
+                    self.extract_subquery_exprs(filter.predicate(), 
optimizer_config)?;
 
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
-                    return Ok(LogicalPlan::Filter(Filter {
-                        predicate: predicate.clone(),
-                        input: Arc::new(optimized_input),
-                    }));
+                    return Ok(LogicalPlan::Filter(Filter::try_new(
+                        filter.predicate().clone(),
+                        Arc::new(optimized_input),
+                    )?));
                 }
 
                 // iterate through all subqueries in predicate, turning each 
into a join
-                let mut cur_input = (**input).clone();
+                let mut cur_input = filter.input().as_ref().clone();
                 for subquery in subqueries {
                     if let Some(optimized_subquery) = optimize_scalar(
                         &subquery,
@@ -122,10 +122,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                         cur_input = optimized_subquery;
                     } else {
                         // if we can't handle all of the subqueries then bail 
for now
-                        return Ok(LogicalPlan::Filter(Filter {
-                            predicate: predicate.clone(),
-                            input: Arc::new(optimized_input),
-                        }));
+                        return Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter.predicate().clone(),
+                            Arc::new(optimized_input),
+                        )?));
                     }
                 }
                 Ok(cur_input)
@@ -228,7 +228,7 @@ fn optimize_scalar(
 
     // if there were filters, we use that logical plan, otherwise the plan 
from the aggregate
     let input = if let Some(filter) = filter {
-        &filter.input
+        filter.input()
     } else {
         &aggr.input
     };
@@ -236,7 +236,7 @@ fn optimize_scalar(
     // if there were filters, split and capture them
     let mut subqry_filter_exprs = vec![];
     if let Some(filter) = filter {
-        split_conjunction(&filter.predicate, &mut subqry_filter_exprs);
+        split_conjunction(filter.predicate(), &mut subqry_filter_exprs);
     }
     verify_not_disjunction(&subqry_filter_exprs)?;
 
diff --git a/datafusion/optimizer/src/simplify_expressions.rs 
b/datafusion/optimizer/src/simplify_expressions.rs
index 233b0e497..23d3edf91 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -1941,7 +1941,7 @@ mod tests {
         assert_optimized_plan_eq(
             &plan,
             "\
-               Filter: test.b > Int32(1) AS test.b > Int32(1) AND test.b > 
Int32(1)\
+               Filter: test.b > Int32(1)\
             \n  Projection: test.a\
             \n    TableScan: test",
         );
@@ -1965,7 +1965,7 @@ mod tests {
         assert_optimized_plan_eq(
             &plan,
             "\
-            Filter: test.a > Int32(5) AND test.b < Int32(6) AS test.a > 
Int32(5) AND test.b < Int32(6) AND test.a > Int32(5)\
+            Filter: test.a > Int32(5) AND test.b < Int32(6)\
             \n  Projection: test.a, test.b\
                \n    TableScan: test",
         );
@@ -1986,8 +1986,8 @@ mod tests {
 
         let expected = "\
         Projection: test.a\
-        \n  Filter: NOT test.c AS test.c = Boolean(false)\
-        \n    Filter: test.b AS test.b = Boolean(true)\
+        \n  Filter: NOT test.c\
+        \n    Filter: test.b\
         \n      TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2011,8 +2011,8 @@ mod tests {
         let expected = "\
         Projection: test.a\
         \n  Limit: skip=0, fetch=1\
-        \n    Filter: test.c AS test.c != Boolean(false)\
-        \n      Filter: NOT test.b AS test.b != Boolean(true)\
+        \n    Filter: test.c\
+        \n      Filter: NOT test.b\
         \n        TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2031,7 +2031,7 @@ mod tests {
 
         let expected = "\
         Projection: test.a\
-        \n  Filter: NOT test.b AND test.c AS test.b != Boolean(true) AND 
test.c = Boolean(true)\
+        \n  Filter: NOT test.b AND test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2050,7 +2050,7 @@ mod tests {
 
         let expected = "\
         Projection: test.a\
-        \n  Filter: NOT test.b OR NOT test.c AS test.b != Boolean(true) OR 
test.c = Boolean(false)\
+        \n  Filter: NOT test.b OR NOT test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2069,7 +2069,7 @@ mod tests {
 
         let expected = "\
         Projection: test.a\
-        \n  Filter: test.b AS NOT test.b = Boolean(false)\
+        \n  Filter: test.b\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2303,7 +2303,7 @@ mod tests {
 
         // Note that constant folder runs and folds the entire
         // expression down to a single constant (true)
-        let expected = "Filter: Boolean(true) AS now() < 
totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) + Int64(50000)\
+        let expected = "Filter: Boolean(true)\
                         \n  TableScan: test";
         let actual = get_optimized_plan_formatted(&plan, &time);
 
@@ -2351,7 +2351,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d <= Int32(10) AS NOT test.d > Int32(10)\
+        let expected = "Filter: test.d <= Int32(10)\
             \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2366,7 +2366,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d <= Int32(10) OR test.d >= Int32(100) AS 
NOT test.d > Int32(10) AND test.d < Int32(100)\
+        let expected = "Filter: test.d <= Int32(10) OR test.d >= Int32(100)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2381,7 +2381,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d <= Int32(10) AND test.d >= Int32(100) 
AS NOT test.d > Int32(10) OR test.d < Int32(100)\
+        let expected = "Filter: test.d <= Int32(10) AND test.d >= Int32(100)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2396,7 +2396,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d > Int32(10) AS NOT NOT test.d > 
Int32(10)\
+        let expected = "Filter: test.d > Int32(10)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2411,7 +2411,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d IS NOT NULL AS NOT test.d IS NULL\
+        let expected = "Filter: test.d IS NOT NULL\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2426,7 +2426,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d IS NULL AS NOT test.d IS NOT NULL\
+        let expected = "Filter: test.d IS NULL\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2441,7 +2441,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d NOT IN ([Int32(1), Int32(2), Int32(3)]) 
AS NOT test.d IN (Map { iter: Iter([Int32(1), Int32(2), Int32(3)]) })\
+        let expected = "Filter: test.d NOT IN ([Int32(1), Int32(2), Int32(3)])\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2456,7 +2456,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d IN ([Int32(1), Int32(2), Int32(3)]) AS 
NOT test.d NOT IN (Map { iter: Iter([Int32(1), Int32(2), Int32(3)]) })\
+        let expected = "Filter: test.d IN ([Int32(1), Int32(2), Int32(3)])\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2477,7 +2477,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d < Int32(1) OR test.d > Int32(10) AS NOT 
test.d BETWEEN Int32(1) AND Int32(10)\
+        let expected = "Filter: test.d < Int32(1) OR test.d > Int32(10)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2498,7 +2498,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d >= Int32(1) AND test.d <= Int32(10) AS 
NOT test.d NOT BETWEEN Int32(1) AND Int32(10)\
+        let expected = "Filter: test.d >= Int32(1) AND test.d <= Int32(10)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2520,7 +2520,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.a NOT LIKE test.b AS NOT test.a LIKE 
test.b\
+        let expected = "Filter: test.a NOT LIKE test.b\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2542,7 +2542,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.a LIKE test.b AS NOT test.a NOT LIKE 
test.b\
+        let expected = "Filter: test.a LIKE test.b\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2557,7 +2557,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d IS NOT DISTINCT FROM Int32(10) AS NOT 
test.d IS DISTINCT FROM Int32(10)\
+        let expected = "Filter: test.d IS NOT DISTINCT FROM Int32(10)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -2572,7 +2572,7 @@ mod tests {
             .unwrap()
             .build()
             .unwrap();
-        let expected = "Filter: test.d IS DISTINCT FROM Int32(10) AS NOT 
test.d IS NOT DISTINCT FROM Int32(10)\
+        let expected = "Filter: test.d IS DISTINCT FROM Int32(10)\
         \n  TableScan: test";
 
         assert_optimized_plan_eq(&plan, expected);
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs 
b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 91d31f28e..bd07eeab2 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -55,13 +55,13 @@ impl OptimizerRule for SubqueryFilterToJoin {
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Filter(Filter { predicate, input }) => {
+            LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
-                let optimized_input = self.optimize(input, optimizer_config)?;
+                let optimized_input = self.optimize(filter.input(), 
optimizer_config)?;
 
                 // Splitting filter expression into components by AND
                 let mut filters = vec![];
-                utils::split_conjunction(predicate, &mut filters);
+                utils::split_conjunction(filter.predicate(), &mut filters);
 
                 // Searching for subquery-based filters
                 let (subquery_filters, regular_filters): (Vec<&Expr>, 
Vec<&Expr>) =
@@ -79,10 +79,10 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 })?;
 
                 if !subqueries_in_regular.is_empty() {
-                    return Ok(LogicalPlan::Filter(Filter {
-                        predicate: predicate.clone(),
-                        input: Arc::new(optimized_input),
-                    }));
+                    return Ok(LogicalPlan::Filter(Filter::try_new(
+                        filter.predicate().clone(),
+                        Arc::new(optimized_input),
+                    )?));
                 };
 
                 // Add subquery joins to new_input
@@ -151,10 +151,10 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 let new_input = match opt_result {
                     Ok(plan) => plan,
                     Err(_) => {
-                        return Ok(LogicalPlan::Filter(Filter {
-                            predicate: predicate.clone(),
-                            input: Arc::new(optimized_input),
-                        }))
+                        return Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter.predicate().clone(),
+                            Arc::new(optimized_input),
+                        )?))
                     }
                 };
 
@@ -162,7 +162,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 if regular_filters.is_empty() {
                     Ok(new_input)
                 } else {
-                    Ok(utils::add_filter(new_input, &regular_filters))
+                    utils::add_filter(new_input, &regular_filters)
                 }
             }
             _ => {
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index a1174276d..d9b5cd9a8 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -105,7 +105,7 @@ pub fn verify_not_disjunction(predicates: &[&Expr]) -> 
Result<()> {
 
 /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] 
with
 /// its predicate be all `predicates` ANDed.
-pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
+pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> 
Result<LogicalPlan> {
     // reduce filters to a single filter with an AND
     let predicate = predicates
         .iter()
@@ -114,10 +114,10 @@ pub fn add_filter(plan: LogicalPlan, predicates: 
&[&Expr]) -> LogicalPlan {
             and(acc, (*predicate).to_owned())
         });
 
-    LogicalPlan::Filter(Filter {
+    Ok(LogicalPlan::Filter(Filter::try_new(
         predicate,
-        input: Arc::new(plan),
-    })
+        Arc::new(plan),
+    )?))
 }
 
 /// Looks for correlating expressions: equality expressions with one field 
from the subquery, and
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index a5ddccdb6..7a9d635f8 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -38,9 +38,8 @@ use datafusion_common::{Column, DataFusionError};
 use datafusion_expr::{
     logical_plan::{
         Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, 
CreateView,
-        CrossJoin, Distinct, EmptyRelation, Extension, Filter, Join, 
JoinConstraint,
-        JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias, 
TableScan, Values,
-        Window,
+        CrossJoin, Distinct, EmptyRelation, Extension, Join, JoinConstraint, 
JoinType,
+        Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, 
Values, Window,
     },
     Expr, LogicalPlan, LogicalPlanBuilder,
 };
@@ -806,17 +805,17 @@ impl AsLogicalPlan for LogicalPlanNode {
                     },
                 ))),
             }),
-            LogicalPlan::Filter(Filter { predicate, input }) => {
+            LogicalPlan::Filter(filter) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
+                        filter.input().as_ref(),
                         extension_codec,
                     )?;
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: 
Some(LogicalPlanType::Selection(Box::new(
                         protobuf::SelectionNode {
                             input: Some(Box::new(input)),
-                            expr: Some(predicate.try_into()?),
+                            expr: Some(filter.predicate().try_into()?),
                         },
                     ))),
                 })
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index b75efd6f4..400bbe4fc 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -982,10 +982,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             x.as_slice(),
                             &[join_columns],
                         )?;
-                        Ok(LogicalPlan::Filter(Filter {
-                            predicate: filter_expr,
-                            input: Arc::new(left),
-                        }))
+                        Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?))
                     }
                     _ => Ok(left),
                 }

Reply via email to