alamb commented on code in PR #10405:
URL: https://github.com/apache/datafusion/pull/10405#discussion_r1594332291


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -467,6 +468,200 @@ impl LogicalPlan {
         self.with_new_exprs(self.expressions(), inputs.to_vec())
     }
 
+    /// Recomputes schema and type information for this LogicalPlan if needed.
+    ///
+    /// Some `LogicalPlan`s may need to recompute their schema if the number or
+    /// type of expressions have been changed (for example due to type
+    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
+    /// its expressions.
+    ///
+    /// Some `LogicalPlan`s schema is unaffected by any changes to their
+    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
+    /// same as its input schema.
+    ///
+    /// # Return value
+    /// Returns an error if there is some issue recomputing the schema.
+    ///
+    /// # Notes
+    ///
+    /// * Does not recursively recompute schema for input (child) plans.
+    pub fn recompute_schema(self) -> Result<Self> {
+        match self {
+            // Since expr may be different than the previous expr, schema of 
the projection
+            // may change. We need to use try_new method instead of 
try_new_with_schema method.
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema: _,
+            }) => Projection::try_new(expr, 
input).map(LogicalPlan::Projection),
+            LogicalPlan::Dml(_) => Ok(self),
+            LogicalPlan::Copy(_) => Ok(self),
+            LogicalPlan::Values(Values { schema, values }) => {
+                // todo it isn't clear why the schema is not recomputed here
+                Ok(LogicalPlan::Values(Values { schema, values }))
+            }
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                // todo: should this logic be moved to Filter::try_new?
+
+                // filter predicates should not contain aliased expressions so 
we remove any aliases
+                // before this logic was added we would have aliases within 
filters such as for
+                // benchmark q6:
+                //
+                // lineitem.l_shipdate >= Date32(\"8766\")
+                // AND lineitem.l_shipdate < Date32(\"9131\")
+                // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount >=
+                // Decimal128(Some(49999999999999),30,15)
+                // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount <=
+                // Decimal128(Some(69999999999999),30,15)
+                // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
+
+                let predicate = predicate
+                    .transform_down(|expr| {
+                        match expr {
+                            Expr::Exists { .. }
+                            | Expr::ScalarSubquery(_)
+                            | Expr::InSubquery(_) => {
+                                // subqueries could contain aliases so we 
don't recurse into those
+                                Ok(Transformed::new(expr, false, 
TreeNodeRecursion::Jump))
+                            }
+                            Expr::Alias(_) => Ok(Transformed::new(
+                                expr.unalias(),
+                                true,
+                                TreeNodeRecursion::Jump,
+                            )),
+                            _ => Ok(Transformed::no(expr)),
+                        }
+                    })
+                    .data()?;
+
+                Filter::try_new(predicate, input).map(LogicalPlan::Filter)
+            }
+            LogicalPlan::Repartition(_) => Ok(self),
+            LogicalPlan::Window(Window {
+                input,
+                window_expr,
+                schema: _,
+            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                schema: _,
+            }) => Aggregate::try_new(input, group_expr, aggr_expr)
+                .map(LogicalPlan::Aggregate),
+            LogicalPlan::Sort(_) => Ok(self),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                filter,
+                join_type,
+                join_constraint,
+                on,
+                schema: _,
+                null_equals_null,
+            }) => {
+                let schema =
+                    build_join_schema(left.schema(), right.schema(), 
&join_type)?;
+
+                let new_on: Vec<_> = on
+                    .into_iter()
+                    .map(|equi_expr| {
+                        // SimplifyExpression rule may add alias to the 
equi_expr.
+                        (equi_expr.0.unalias(), equi_expr.1.unalias())
+                    })
+                    .collect();
+
+                Ok(LogicalPlan::Join(Join {
+                    left,
+                    right,
+                    join_type,
+                    join_constraint,
+                    on: new_on,
+                    filter,
+                    schema: DFSchemaRef::new(schema),
+                    null_equals_null,
+                }))
+            }
+            LogicalPlan::CrossJoin(CrossJoin {
+                left,
+                right,
+                schema: _,
+            }) => {
+                let join_schema =
+                    build_join_schema(left.schema(), right.schema(), 
&JoinType::Inner)?;
+
+                Ok(LogicalPlan::CrossJoin(CrossJoin {
+                    left,
+                    right,
+                    schema: join_schema.into(),
+                }))
+            }
+            LogicalPlan::Subquery(_) => Ok(self),
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input,
+                alias,
+                schema: _,
+            }) => SubqueryAlias::try_new(input, 
alias).map(LogicalPlan::SubqueryAlias),
+            LogicalPlan::Limit(_) => Ok(self),
+            LogicalPlan::Ddl(_) => Ok(self),
+            LogicalPlan::Extension(Extension { node }) => {
+                // todo make an API that does not require cloning
+                // This requires a copy of the extension nodes expressions and 
inputs
+                let expr = node.expressions();
+                let inputs: Vec<_> = 
node.inputs().into_iter().cloned().collect();
+                Ok(LogicalPlan::Extension(Extension {
+                    node: node.from_template(&expr, &inputs),
+                }))
+            }
+            LogicalPlan::Union(Union { inputs, schema }) => {
+                let input_schema = inputs[0].schema();
+                // If inputs are not pruned do not change schema
+                // TODO this seems wrong (shouldn't we always use the schema 
of the input?)
+                let schema = if schema.fields().len() == 
input_schema.fields().len() {

Review Comment:
   I agree it doesn't make sense -- however, it is the same logic as in 
`with_new_exprs`:
   
   
https://github.com/apache/datafusion/blob/fad16e74229decc40bccf22947d3659cd29fee6f/datafusion/expr/src/logical_plan/plan.rs#L913-L925



##########
datafusion/sqllogictest/test_files/cte.slt:
##########
@@ -105,14 +104,13 @@ EXPLAIN WITH RECURSIVE nodes AS (
 SELECT * FROM nodes
 ----
 logical_plan
-01)Projection: nodes.id
-02)--SubqueryAlias: nodes
-03)----RecursiveQuery: is_distinct=false
-04)------Projection: Int64(1) AS id
-05)--------EmptyRelation
-06)------Projection: nodes.id + Int64(1) AS id
-07)--------Filter: nodes.id < Int64(10)
-08)----------TableScan: nodes
+01)SubqueryAlias: nodes

Review Comment:
   Yes that is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to