[ 
https://issues.apache.org/jira/browse/SPARK-33013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhou xiang updated SPARK-33013:
-------------------------------
    Description: 
 

 

 Consider the case below:
{code:java}
Seq((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)).toDF("a", "b", "c", 
"d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", 
"t").write.saveAsTable("test") 
val df = spark.table("test") 
val df2 = df.filter("a+b+c+d+e+f+g+h+i+j+k+l+m+n+o+p+q+r+s+t > 100") 
val df3 = df2.select('a as 'a1, 'b as 'b1, 'c as 'c1, 'd as 'd1, 'e as 'e1, 'f 
as 'f1, 'g as 'g1, 'h as 'h1, 'i as 'i1, 'j as 'j1, 'k as 'k1, 'l as 'l1, 'm as 
'm1, 'n as 'n1, 'o as 'o1, 'p as 'p1, 'q as 'q1, 'r as 'r1, 's as 's1, 't as 
't1) 
val df4 = df3.join(df2, df3("a1") === df2("a")) 
df4.explain(true)
{code}
If you run the this in spark shell, it will got stuck at "df4.explain(true)". 
The reason is in sql optimizer 'InferFiltersFromConstraints', it will try to 
infer all the constrains from the plan. And the plan has a constrain contains 
about 20 columns, each column has an alias. It will try to replace the column 
with alias, and at the same time keep the origin constrain, that will lead to 
the constrains grow exponentially. And make driver oom in the end.

The related code:
{code:java}
  /**
   * Generates all valid constraints including an set of aliased constraints by 
replacing the
   * original constraint expressions with the corresponding alias
   */
  protected def getAllValidConstraints(projectList: Seq[NamedExpression]): 
Set[Expression] = {
    var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
    projectList.foreach {
      case a @ Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a @ Alias(e, _) =>
        // For every alias in `projectList`, replace the reference in 
constraints by its attribute.
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }    
    allConstraints
  }
{code}
 

 

 
  
  

  was:
Consider the case below:

 
{code:java}
Seq((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)).toDF("a", "b", "c", 
"d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", 
"t").write.saveAsTable("test") 
val df = spark.table("test") 
val df2 = df.filter("a+b+c+d+e+f+g+h+i+j+k+l+m+n+o+p+q+r+s+t > 100") 
val df3 = df2.select('a as 'a1, 'b as 'b1, 'c as 'c1, 'd as 'd1, 'e as 'e1, 'f 
as 'f1, 'g as 'g1, 'h as 'h1, 'i as 'i1, 'j as 'j1, 'k as 'k1, 'l as 'l1, 'm as 
'm1, 'n as 'n1, 'o as 'o1, 'p as 'p1, 'q as 'q1, 'r as 'r1, 's as 's1, 't as 
't1) 
val df4 = df3.join(df2, df3("a1") === df2("a")) 
df4.explain(true)
{code}
 

If you run the this in spark shell, it will got stuck at "df4.explain(true)". 
The reason is in sql optimizer 'InferFiltersFromConstraints', it will try to 
infer all the constrains from the plan. And the plan has a constrain contains 
about 20 columns, each column has an alias. It will try to replace the column 
with alias, and at the same time keep the origin constrain, that will lead to 
the constrains grow exponentially. And make driver oom in the end.

The related code:

 
{code:java}
/** * Generates all valid constraints including an set of aliased constraints 
by replacing the * original constraint expressions with the corresponding alias 
*/ protected def getAllValidConstraints(projectList: Seq[NamedExpression]): 
Set[Expression] = { var allConstraints = 
child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ 
Alias(l: Literal, _) => allConstraints += EqualNullSafe(a.toAttribute, l) case 
a @ Alias(e, _) => // For every alias in `projectList`, replace the reference 
in constraints by its attribute. allConstraints ++= allConstraints.map(_ 
transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) 
allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } 
allConstraints }
{code}
 

 

 
 
 


> The constrains may grow exponentially in sql optimizer 
> 'InferFiltersFromConstraints', which leads to driver oom
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33013
>                 URL: https://issues.apache.org/jira/browse/SPARK-33013
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.0.1
>            Reporter: zhou xiang
>            Priority: Major
>
>  
>  
>  Consider the case below:
> {code:java}
> Seq((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)).toDF("a", "b", "c", 
> "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", 
> "s", "t").write.saveAsTable("test") 
> val df = spark.table("test") 
> val df2 = df.filter("a+b+c+d+e+f+g+h+i+j+k+l+m+n+o+p+q+r+s+t > 100") 
> val df3 = df2.select('a as 'a1, 'b as 'b1, 'c as 'c1, 'd as 'd1, 'e as 'e1, 
> 'f as 'f1, 'g as 'g1, 'h as 'h1, 'i as 'i1, 'j as 'j1, 'k as 'k1, 'l as 'l1, 
> 'm as 'm1, 'n as 'n1, 'o as 'o1, 'p as 'p1, 'q as 'q1, 'r as 'r1, 's as 's1, 
> 't as 't1) 
> val df4 = df3.join(df2, df3("a1") === df2("a")) 
> df4.explain(true)
> {code}
> If you run the this in spark shell, it will got stuck at "df4.explain(true)". 
> The reason is in sql optimizer 'InferFiltersFromConstraints', it will try to 
> infer all the constrains from the plan. And the plan has a constrain contains 
> about 20 columns, each column has an alias. It will try to replace the column 
> with alias, and at the same time keep the origin constrain, that will lead to 
> the constrains grow exponentially. And make driver oom in the end.
> The related code:
> {code:java}
>   /**
>    * Generates all valid constraints including an set of aliased constraints 
> by replacing the
>    * original constraint expressions with the corresponding alias
>    */
>   protected def getAllValidConstraints(projectList: Seq[NamedExpression]): 
> Set[Expression] = {
>     var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
>     projectList.foreach {
>       case a @ Alias(l: Literal, _) =>
>         allConstraints += EqualNullSafe(a.toAttribute, l)
>       case a @ Alias(e, _) =>
>         // For every alias in `projectList`, replace the reference in 
> constraints by its attribute.
>         allConstraints ++= allConstraints.map(_ transform {
>           case expr: Expression if expr.semanticEquals(e) =>
>             a.toAttribute
>         })
>         allConstraints += EqualNullSafe(e, a.toAttribute)
>       case _ => // Don't change.
>     }    
>     allConstraints
>   }
> {code}
>  
>  
>  
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to