Repository: spark
Updated Branches:
  refs/heads/master c571cfb2d -> b1b47274b


[SPARK-17702][SQL] Code generation including too many mutable states exceeds 
JVM size limit.

## What changes were proposed in this pull request?

Code generation including too many mutable states exceeds JVM size limit to 
extract values from `references` into fields in the constructor.
We should split the generated extractions in the constructor into smaller 
functions.

## How was this patch tested?

I added some tests to check if the generated codes for the expressions exceed 
or not.

Author: Takuya UESHIN <[email protected]>

Closes #15275 from ueshin/issues/SPARK-17702.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1b47274
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1b47274
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1b47274

Branch: refs/heads/master
Commit: b1b47274bfeba17a9e4e9acebd7385289f31f6c8
Parents: c571cfb
Author: Takuya UESHIN <[email protected]>
Authored: Mon Oct 3 21:48:58 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Oct 3 21:48:58 2016 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     | 18 ++++++++++++-----
 .../codegen/GenerateMutableProjection.scala     |  3 ++-
 .../expressions/codegen/GenerateOrdering.scala  |  3 ++-
 .../expressions/codegen/GeneratePredicate.scala |  4 +++-
 .../codegen/GenerateSafeProjection.scala        |  4 +++-
 .../codegen/GenerateUnsafeProjection.scala      |  3 ++-
 .../expressions/CodeGenerationSuite.scala       | 21 +++++++++++++++++++-
 .../sql/execution/WholeStageCodegenExec.scala   |  4 +++-
 8 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index cb808e3..574943d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -178,7 +178,10 @@ class CodegenContext {
   def initMutableStates(): String = {
     // It's possible that we add same mutable state twice, e.g. the 
`mergeExpressions` in
     // `TypedAggregateExpression`, we should call `distinct` here to remove 
the duplicated ones.
-    mutableStates.distinct.map(_._3).mkString("\n")
+    val initCodes = mutableStates.distinct.map(_._3 + "\n")
+    // The generated initialization code may exceed 64kb function size limit 
in JVM if there are too
+    // many mutable states, so split it into multiple functions.
+    splitExpressions(initCodes, "init", Nil)
   }
 
   /**
@@ -604,6 +607,11 @@ class CodegenContext {
       // Cannot split these expressions because they are not created from a 
row object.
       return expressions.mkString("\n")
     }
+    splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil)
+  }
+
+  private def splitExpressions(
+      expressions: Seq[String], funcName: String, arguments: Seq[(String, 
String)]): String = {
     val blocks = new ArrayBuffer[String]()
     val blockBuilder = new StringBuilder()
     for (code <- expressions) {
@@ -623,11 +631,11 @@ class CodegenContext {
       // inline execution if only one block
       blocks.head
     } else {
-      val apply = freshName("apply")
+      val func = freshName(funcName)
       val functions = blocks.zipWithIndex.map { case (body, i) =>
-        val name = s"${apply}_$i"
+        val name = s"${func}_$i"
         val code = s"""
-           |private void $name(InternalRow $row) {
+           |private void $name(${arguments.map { case (t, name) => s"$t $name" 
}.mkString(", ")}) {
            |  $body
            |}
          """.stripMargin
@@ -635,7 +643,7 @@ class CodegenContext {
         name
       }
 
-      functions.map(name => s"$name($row);").mkString("\n")
+      functions.map(name => s"$name(${arguments.map(_._2).mkString(", 
")});").mkString("\n")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 0f82d2e..13d61af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -104,7 +104,6 @@ object GenerateMutableProjection extends 
CodeGenerator[Seq[Expression], MutableP
         private Object[] references;
         private MutableRow mutableRow;
         ${ctx.declareMutableStates()}
-        ${ctx.declareAddedFunctions()}
 
         public SpecificMutableProjection(Object[] references) {
           this.references = references;
@@ -112,6 +111,8 @@ object GenerateMutableProjection extends 
CodeGenerator[Seq[Expression], MutableP
           ${ctx.initMutableStates()}
         }
 
+        ${ctx.declareAddedFunctions()}
+
         public ${classOf[BaseMutableProjection].getName} target(MutableRow 
row) {
           mutableRow = row;
           return this;

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index f1c30ef..1cef956 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -133,13 +133,14 @@ object GenerateOrdering extends 
CodeGenerator[Seq[SortOrder], Ordering[InternalR
 
         private Object[] references;
         ${ctx.declareMutableStates()}
-        ${ctx.declareAddedFunctions()}
 
         public SpecificOrdering(Object[] references) {
           this.references = references;
           ${ctx.initMutableStates()}
         }
 
+        ${ctx.declareAddedFunctions()}
+
         public int compare(InternalRow a, InternalRow b) {
           InternalRow ${ctx.INPUT_ROW} = null;  // Holds current row being 
evaluated.
           $comparisons

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index 106bb27..39aa7b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -40,6 +40,7 @@ object GeneratePredicate extends CodeGenerator[Expression, 
(InternalRow) => Bool
   protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
     val ctx = newCodeGenContext()
     val eval = predicate.genCode(ctx)
+
     val codeBody = s"""
       public SpecificPredicate generate(Object[] references) {
         return new SpecificPredicate(references);
@@ -48,13 +49,14 @@ object GeneratePredicate extends CodeGenerator[Expression, 
(InternalRow) => Bool
       class SpecificPredicate extends ${classOf[Predicate].getName} {
         private final Object[] references;
         ${ctx.declareMutableStates()}
-        ${ctx.declareAddedFunctions()}
 
         public SpecificPredicate(Object[] references) {
           this.references = references;
           ${ctx.initMutableStates()}
         }
 
+        ${ctx.declareAddedFunctions()}
+
         public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
           ${eval.code}
           return !${eval.isNull} && ${eval.value};

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index b891f94..1c98c9e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -155,6 +155,7 @@ object GenerateSafeProjection extends 
CodeGenerator[Seq[Expression], Projection]
           """
     }
     val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes)
+
     val codeBody = s"""
       public java.lang.Object generate(Object[] references) {
         return new SpecificSafeProjection(references);
@@ -165,7 +166,6 @@ object GenerateSafeProjection extends 
CodeGenerator[Seq[Expression], Projection]
         private Object[] references;
         private MutableRow mutableRow;
         ${ctx.declareMutableStates()}
-        ${ctx.declareAddedFunctions()}
 
         public SpecificSafeProjection(Object[] references) {
           this.references = references;
@@ -173,6 +173,8 @@ object GenerateSafeProjection extends 
CodeGenerator[Seq[Expression], Projection]
           ${ctx.initMutableStates()}
         }
 
+        ${ctx.declareAddedFunctions()}
+
         public java.lang.Object apply(java.lang.Object _i) {
           InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
           $allExpressions

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 75bb693..7cc4537 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -374,13 +374,14 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 
         private Object[] references;
         ${ctx.declareMutableStates()}
-        ${ctx.declareAddedFunctions()}
 
         public SpecificUnsafeProjection(Object[] references) {
           this.references = references;
           ${ctx.initMutableStates()}
         }
 
+        ${ctx.declareAddedFunctions()}
+
         // Scala.Function1 need this
         public java.lang.Object apply(java.lang.Object row) {
           return apply((InternalRow) row);

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 45dcfca..5588b44 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.sql.Timestamp
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.Row
@@ -24,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.objects.{CreateExternalRow, 
GetExternalRowField, ValidateExternalType}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.ThreadUtils
@@ -164,6 +166,23 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     }
   }
 
+  test("SPARK-17702: split wide constructor into blocks due to JVM code size 
limit") {
+    val length = 5000
+    val expressions = Seq.fill(length) {
+      ToUTCTimestamp(
+        Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), 
TimestampType),
+        Literal.create("PST", StringType))
+    }
+    val plan = GenerateMutableProjection.generate(expressions)
+    val actual = plan(new 
GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val expected = Seq.fill(length)(
+      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 
07:00:00")))
+
+    if (!checkResult(actual, expected)) {
+      fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
+    }
+  }
+
   test("test generated safe and unsafe projection") {
     val schema = new StructType(Array(
       StructField("a", StringType, true),

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b47274/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index fb57ed7..62bf6f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -316,14 +316,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
       final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
 
         private Object[] references;
+        private scala.collection.Iterator[] inputs;
         ${ctx.declareMutableStates()}
 
         public GeneratedIterator(Object[] references) {
           this.references = references;
         }
 
-        public void init(int index, scala.collection.Iterator inputs[]) {
+        public void init(int index, scala.collection.Iterator[] inputs) {
           partitionIndex = index;
+          this.inputs = inputs;
           ${ctx.initMutableStates()}
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to