Repository: spark Updated Branches: refs/heads/branch-2.2 17ba7b9b6 -> 52b05b6d7
[SPARK-22494][SQL] Fix 64KB limit exception with Coalesce and AtleastNNonNulls ## What changes were proposed in this pull request? Both `Coalesce` and `AtLeastNNonNulls` can cause the 64KB limit exception when used with a lot of arguments and/or complex expressions. This PR splits their expressions in order to avoid the issue. ## How was this patch tested? Added UTs Author: Marco Gaido <marcogaid...@gmail.com> Author: Marco Gaido <mga...@hortonworks.com> Closes #19720 from mgaido91/SPARK-22494. (cherry picked from commit 4e7f07e2550fa995cc37406173a937033135cf3b) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52b05b6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52b05b6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52b05b6d Branch: refs/heads/branch-2.2 Commit: 52b05b6d70efa3e58eac3b9c16334c494e67a3ca Parents: 17ba7b9 Author: Marco Gaido <marcogaid...@gmail.com> Authored: Thu Nov 16 18:19:13 2017 +0100 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Nov 16 18:19:26 2017 +0100 ---------------------------------------------------------------------- .../catalyst/expressions/nullExpressions.scala | 42 +++++++++++++++----- .../expressions/NullExpressionsSuite.scala | 10 +++++ 2 files changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/52b05b6d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala index 92036b7..00bb1a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala @@ -71,14 +71,10 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val first = children(0) - val rest = children.drop(1) - val firstEval = first.genCode(ctx) - ev.copy(code = s""" - ${firstEval.code} - boolean ${ev.isNull} = ${firstEval.isNull}; - ${ctx.javaType(dataType)} ${ev.value} = ${firstEval.value};""" + - rest.map { e => + ctx.addMutableState("boolean", ev.isNull, "") + ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + + val evals = children.map { e => val eval = e.genCode(ctx) s""" if (${ev.isNull}) { @@ -89,7 +85,12 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } } """ - }.mkString("\n")) + } + + ev.copy(code = s""" + ${ev.isNull} = true; + ${ev.value} = ${ctx.defaultValue(dataType)}; + ${ctx.splitExpressions(ctx.INPUT_ROW, evals)}""") } } @@ -356,7 +357,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val nonnull = ctx.freshName("nonnull") - val code = children.map { e => + val evals = children.map { e => val eval = e.genCode(ctx) e.dataType match { case DoubleType | FloatType => @@ -378,7 +379,26 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate } """ } - }.mkString("\n") + } + + val code = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { + evals.mkString("\n") + } else { + ctx.splitExpressions(evals, "atLeastNNonNulls", + ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil, + returnType = "int", + makeSplitFunction = { body => + s""" + $body + return $nonnull; + """ + }, + foldFunctions = { funcCalls => + funcCalls.map(funcCall => s"$nonnull = $funcCall;").mkString("\n") + } + ) + } + ev.copy(code = s""" int $nonnull = 0; $code http://git-wip-us.apache.org/repos/asf/spark/blob/52b05b6d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala index 5064a1f..2ea6aa8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala @@ -133,4 +133,14 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(AtLeastNNonNulls(3, nullOnly), true, EmptyRow) checkEvaluation(AtLeastNNonNulls(4, nullOnly), false, EmptyRow) } + + test("Coalesce should not throw 64kb exception") { + val inputs = (1 to 2500).map(x => Literal(s"x_$x")) + checkEvaluation(Coalesce(inputs), "x_1") + } + + test("AtLeastNNonNulls should not throw 64kb exception") { + val inputs = (1 to 4000).map(x => Literal(s"x_$x")) + checkEvaluation(AtLeastNNonNulls(1, inputs), true) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org