[ 
https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214298#comment-17214298
 ] 

Asif commented on SPARK-33152:
------------------------------

I will be generating a PR for the same..

> Constraint Propagation code causes OOM issues or increasing compilation time 
> to hours
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-33152
>                 URL: https://issues.apache.org/jira/browse/SPARK-33152
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Asif
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We encountered this issue at Workday. 
> The issue is that current Constraints Propagation code pessimistically 
> generates all the possible permutations of base constraint for the aliases in 
> the project node.
> This causes blow up of the number of constraints generated causing OOM issues 
> at compile time of sql query, or queries taking 18 min to 2 hrs to compile.
> The problematic piece of code is in LogicalPlan.getAliasedConstraints
> projectList.foreach {
>  case a @ Alias(l: Literal, _) =>
>  allConstraints += EqualNullSafe(a.toAttribute, l)
>  case a @ Alias(e, _) =>
>  // For every alias in `projectList`,replace the reference in
>  // constraints by its attribute.
>  allConstraints ++= allConstraints.map(_ transform {
>  case expr: Expression if expr.semanticEquals(e) =>
>  a.toAttribute
>  })
>  allConstraints += EqualNullSafe(e, a.toAttribute)
>  case _ => // Don't change.
>  }
> so consider a hypothetical plan
>  
> Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as 
> c2 , c as c3)
>    |
> Filter f(a, b, c)
> |
> Base Relation (a, b, c)
> and so we have projection as
> a, a1, a2, a3
> b, b1, b2
> c, c1, c2, c3
> Lets say hypothetically f(a, b, c) has a occurring 1 times, b occurring 2 
> times, and C occurring 3 times.
> So at project node the number of constraints for a single base constraint 
> f(a, b, c) will be
> 4C1 * 3C2 * 4C3 = 48
> In our case, we have seen number of constraints going up to > 30000 or more, 
> as there are complex case statements in the projection.
> Spark generates all these constraints pessimistically for pruning filters or 
> push down predicates for join , it may encounter when the optimizer traverses 
> up the tree.
>  
> This is issue is solved at our end by modifying the spark code to use a 
> different logic.
> The idea is simple. 
> Instead of generating pessimistically all possible combinations of base 
> constraint, just store the original base constraints & track the aliases at 
> each level.
> The principal followed is this:
> 1) Store the base constraint and keep the track of the aliases for the 
> underlying attribute.
> 2) If the base attribute composing the constraint is not in the output set, 
> see if the constraint survives by substituting the attribute getting removed 
> with the next available alias's attribute.
>  
> For checking if a filter can be pruned , just canonicalize the filter with 
> the attribute at 0th position of the tracking list & compare with the 
> underlying base constraint.
> To elaborate using  the plan above.
> At project node
> We have constraint f(a,b,c)
> we keep track of alias
> List 1  : a, a1.attribute, a2.attribute, a3.attribute
> List2 :  b, b1.attribute, b2.attribute 
> List3: c, c1.attribute, c2.attribute, c3.attribute
> Lets say above the project node, we encounter a filter
> f(a1, b2, c3)
> So canonicalize the filter by using the above list data, to convert it to 
> f(a,b c) & compare it with the stored base constraints.
>  
> For predicate push down , instead of generating all the redundant 
> combinations of constraints , just generate one constraint per element of the 
> alias.
> In the current spark code , in any case, filter push down happens only for 1 
> variable at a time.
> So just expanding the filter (a,b,c) to
> f(a, b, c), f(a1, b, c), f(a2, b, c), f(a3, , b ,c), f (a, b1, c), f(a, b2, 
> c) , f(a, b, c1), f(a, b, c2), f(a, b, c3) 
> would suffice, rather than generating all the redundant combinations.
> In fact the code can be easily modified to generate only those constraints 
> which involve variables forming the join condition. so the number of  
> constraints generated on expand are further reduced.
> We already have code to generate compound filters for push down ( join on 
> multiple conditions), which can be used for single variable condition, push 
> down too.
> Just to elaborate the logic further, if we consider the above hypothetical 
> plan (assume collapse project rule is not there)
>  
> Project (a1, a1. as a4, b,  c1, c1 as c4)
>   |
> Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as 
> c2 , c as c3)
>    |
> Filter f(a, b, c)
> |
> Base Relation (a, b, c)
>  
> So at project node2, the constraints data will become
> we keep track of alias
> List 1  : a1, a4.attribute
> List2 :  b
> List3: c1, c4.attribute
> And the constraint f(a, b, c) is modified to f(a1, b, c1)
>  
> The above logic is also more effective in filter pruning then the current 
> code as some of our tests show. Besides minimal over head of constraint 
> propagation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to