zhou xiang created SPARK-33013:
----------------------------------
Summary: The constrains may grow exponentially in sql optimizer
'InferFiltersFromConstraints', which leads to driver oom
Key: SPARK-33013
URL: https://issues.apache.org/jira/browse/SPARK-33013
Project: Spark
Issue Type: Bug
Components: Optimizer, SQL
Affects Versions: 3.0.1
Reporter: zhou xiang
Consider the case below:
{code:java}
Seq((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)).toDF("a", "b", "c",
"d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s",
"t").write.saveAsTable("test")
val df = spark.table("test")
val df2 = df.filter("a+b+c+d+e+f+g+h+i+j+k+l+m+n+o+p+q+r+s+t > 100")
val df3 = df2.select('a as 'a1, 'b as 'b1, 'c as 'c1, 'd as 'd1, 'e as 'e1, 'f
as 'f1, 'g as 'g1, 'h as 'h1, 'i as 'i1, 'j as 'j1, 'k as 'k1, 'l as 'l1, 'm as
'm1, 'n as 'n1, 'o as 'o1, 'p as 'p1, 'q as 'q1, 'r as 'r1, 's as 's1, 't as
't1)
val df4 = df3.join(df2, df3("a1") === df2("a"))
df4.explain(true)
{code}
If you run the this in spark shell, it will got stuck at "df4.explain(true)".
The reason is in sql optimizer 'InferFiltersFromConstraints', it will try to
infer all the constrains from the plan. And the plan has a constrain contains
about 20 columns, each column has an alias. It will try to replace the column
with alias, and at the same time keep the origin constrain, that will lead to
the constrains grow exponentially. And make driver oom in the end.
The related code:
{code:java}
/** * Generates all valid constraints including an set of aliased constraints
by replacing the * original constraint expressions with the corresponding alias
*/ protected def getAllValidConstraints(projectList: Seq[NamedExpression]):
Set[Expression] = { var allConstraints =
child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @
Alias(l: Literal, _) => allConstraints += EqualNullSafe(a.toAttribute, l) case
a @ Alias(e, _) => // For every alias in `projectList`, replace the reference
in constraints by its attribute. allConstraints ++= allConstraints.map(_
transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute })
allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. }
allConstraints }
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]