Repository: spark
Updated Branches:
  refs/heads/master 9bdff0bcd -> 96e947ed6


[SPARK-22569][SQL] Clean usage of addMutableState and splitExpressions

## What changes were proposed in this pull request?
This PR is to clean the usage of addMutableState and splitExpressions

1. replace hardcoded type string to ctx.JAVA_BOOLEAN etc.
2. create a default value of the initCode for ctx.addMutableStats
3. Use named arguments when calling `splitExpressions `

## How was this patch tested?
The existing test cases

Author: gatorsmile <[email protected]>

Closes #19790 from gatorsmile/codeClean.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96e947ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96e947ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96e947ed

Branch: refs/heads/master
Commit: 96e947ed6c945bdbe71c308112308489668f19ac
Parents: 9bdff0b
Author: gatorsmile <[email protected]>
Authored: Tue Nov 21 13:48:09 2017 +0100
Committer: Wenchen Fan <[email protected]>
Committed: Tue Nov 21 13:48:09 2017 +0100

----------------------------------------------------------------------
 .../expressions/MonotonicallyIncreasingID.scala |  4 +-
 .../catalyst/expressions/SparkPartitionID.scala |  2 +-
 .../sql/catalyst/expressions/arithmetic.scala   |  8 ++--
 .../expressions/codegen/CodeGenerator.scala     | 22 +++++++---
 .../codegen/GenerateMutableProjection.scala     |  2 +-
 .../expressions/complexTypeCreator.scala        |  2 +-
 .../spark/sql/catalyst/expressions/hash.scala   |  6 +--
 .../catalyst/expressions/nullExpressions.scala  | 10 +++--
 .../catalyst/expressions/objects/objects.scala  | 26 +++++------
 .../sql/catalyst/expressions/predicates.scala   |  6 +--
 .../expressions/randomExpressions.scala         |  4 +-
 .../expressions/stringExpressions.scala         | 45 +++++++++++---------
 .../spark/sql/execution/ColumnarBatchScan.scala |  4 +-
 .../apache/spark/sql/execution/SortExec.scala   |  2 +-
 .../execution/aggregate/HashAggregateExec.scala | 20 ++++-----
 .../execution/aggregate/HashMapGenerator.scala  |  4 +-
 .../sql/execution/basicPhysicalOperators.scala  |  8 ++--
 .../columnar/GenerateColumnAccessor.scala       |  2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |  6 +--
 .../org/apache/spark/sql/execution/limit.scala  |  4 +-
 20 files changed, 104 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index 84027b5..821d784 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -67,8 +67,8 @@ case class MonotonicallyIncreasingID() extends LeafExpression 
with Nondeterminis
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val countTerm = ctx.freshName("count")
     val partitionMaskTerm = ctx.freshName("partitionMask")
-    ctx.addMutableState(ctx.JAVA_LONG, countTerm, "")
-    ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, "")
+    ctx.addMutableState(ctx.JAVA_LONG, countTerm)
+    ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm)
     ctx.addPartitionInitializationStatement(s"$countTerm = 0L;")
     ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) 
partitionIndex) << 33;")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
index 8db7efd..4fa18d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
@@ -44,7 +44,7 @@ case class SparkPartitionID() extends LeafExpression with 
Nondeterministic {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val idTerm = ctx.freshName("partitionId")
-    ctx.addMutableState(ctx.JAVA_INT, idTerm, "")
+    ctx.addMutableState(ctx.JAVA_INT, idTerm)
     ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;")
     ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", 
isNull = "false")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 72d5889..e5a1096 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -602,8 +602,8 @@ case class Least(children: Seq[Expression]) extends 
Expression {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val evalChildren = children.map(_.genCode(ctx))
-    ctx.addMutableState("boolean", ev.isNull, "")
-    ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
+    ctx.addMutableState(ctx.javaType(dataType), ev.value)
     def updateEval(eval: ExprCode): String = {
       s"""
         ${eval.code}
@@ -668,8 +668,8 @@ case class Greatest(children: Seq[Expression]) extends 
Expression {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val evalChildren = children.map(_.genCode(ctx))
-    ctx.addMutableState("boolean", ev.isNull, "")
-    ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
+    ctx.addMutableState(ctx.javaType(dataType), ev.value)
     def updateEval(eval: ExprCode): String = {
       s"""
         ${eval.code}

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e02f125..7861719 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -157,7 +157,19 @@ class CodegenContext {
   val mutableStates: mutable.ArrayBuffer[(String, String, String)] =
     mutable.ArrayBuffer.empty[(String, String, String)]
 
-  def addMutableState(javaType: String, variableName: String, initCode: 
String): Unit = {
+  /**
+   * Add a mutable state as a field to the generated class. c.f. the comments 
above.
+   *
+   * @param javaType Java type of the field. Note that short names can be used 
for some types,
+   *                 e.g. InternalRow, UnsafeRow, UnsafeArrayData, etc. Other 
types will have to
+   *                 specify the fully-qualified Java type name. See the code 
in doCompile() for
+   *                 the list of default imports available.
+   *                 Also, generic type arguments are accepted but ignored.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   *                 If left blank, the field will be default-initialized.
+   */
+  def addMutableState(javaType: String, variableName: String, initCode: String 
= ""): Unit = {
     mutableStates += ((javaType, variableName, initCode))
   }
 
@@ -191,7 +203,7 @@ class CodegenContext {
     val initCodes = mutableStates.distinct.map(_._3 + "\n")
     // The generated initialization code may exceed 64kb function size limit 
in JVM if there are too
     // many mutable states, so split it into multiple functions.
-    splitExpressions(initCodes, "init", Nil)
+    splitExpressions(expressions = initCodes, funcName = "init", arguments = 
Nil)
   }
 
   /**
@@ -769,7 +781,7 @@ class CodegenContext {
       // Cannot split these expressions because they are not created from a 
row object.
       return expressions.mkString("\n")
     }
-    splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil)
+    splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
   /**
@@ -931,7 +943,7 @@ class CodegenContext {
       dataType: DataType,
       baseFuncName: String): (String, String, String) = {
     val globalIsNull = freshName("isNull")
-    addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
+    addMutableState(JAVA_BOOLEAN, globalIsNull, s"$globalIsNull = false;")
     val globalValue = freshName("value")
     addMutableState(javaType(dataType), globalValue,
       s"$globalValue = ${defaultValue(dataType)};")
@@ -1038,7 +1050,7 @@ class CodegenContext {
       //   2. Less code.
       // Currently, we will do this for all non-leaf only expression trees 
(i.e. expr trees with
       // at least two nodes) as the cost of doing it is expected to be low.
-      addMutableState("boolean", isNull, s"$isNull = false;")
+      addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;")
       addMutableState(javaType(expr.dataType), value,
         s"$value = ${defaultValue(expr.dataType)};")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index b5429fa..802e8bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -63,7 +63,7 @@ object GenerateMutableProjection extends 
CodeGenerator[Seq[Expression], MutableP
         if (e.nullable) {
           val isNull = s"isNull_$i"
           val value = s"value_$i"
-          ctx.addMutableState("boolean", isNull, s"$isNull = true;")
+          ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull, s"$isNull = true;")
           ctx.addMutableState(ctx.javaType(e.dataType), value,
             s"$value = ${ctx.defaultValue(e.dataType)};")
           s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 4b6574a..2a00d57 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -120,7 +120,7 @@ private [sql] object GenArrayData {
         UnsafeArrayData.calculateHeaderPortionInBytes(numElements) +
         
ByteArrayMethods.roundNumberOfBytesToNearestWord(elementType.defaultSize * 
numElements)
       val baseOffset = Platform.BYTE_ARRAY_OFFSET
-      ctx.addMutableState("UnsafeArrayData", arrayDataName, "")
+      ctx.addMutableState("UnsafeArrayData", arrayDataName)
 
       val primitiveValueTypeName = ctx.primitiveTypeName(elementType)
       val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index eb3c49f..9e0786e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -277,7 +277,7 @@ abstract class HashExpression[E] extends Expression {
       }
     })
 
-    ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
+    ctx.addMutableState(ctx.javaType(dataType), ev.value)
     ev.copy(code = s"""
       ${ev.value} = $seed;
       $childrenHash""")
@@ -616,8 +616,8 @@ case class HiveHash(children: Seq[Expression]) extends 
HashExpression[Int] {
         s"\n$childHash = 0;"
     })
 
-    ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
-    ctx.addMutableState("int", childHash, s"$childHash = 0;")
+    ctx.addMutableState(ctx.javaType(dataType), ev.value)
+    ctx.addMutableState(ctx.JAVA_INT, childHash, s"$childHash = 0;")
     ev.copy(code = s"""
       ${ev.value} = $seed;
       $childrenHash""")

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
index 4aeab2c..5eaf3f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
@@ -72,8 +72,8 @@ case class Coalesce(children: Seq[Expression]) extends 
Expression {
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    ctx.addMutableState("boolean", ev.isNull, "")
-    ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
+    ctx.addMutableState(ctx.javaType(dataType), ev.value)
 
     val evals = children.map { e =>
       val eval = e.genCode(ctx)
@@ -385,8 +385,10 @@ case class AtLeastNNonNulls(n: Int, children: 
Seq[Expression]) extends Predicate
     val code = if (ctx.INPUT_ROW == null || ctx.currentVars != null) {
       evals.mkString("\n")
     } else {
-      ctx.splitExpressions(evals, "atLeastNNonNulls",
-        ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil,
+      ctx.splitExpressions(
+        expressions = evals,
+        funcName = "atLeastNNonNulls",
+        arguments = ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil,
         returnType = "int",
         makeSplitFunction = { body =>
           s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index f2eee99..006d37f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -63,14 +63,14 @@ trait InvokeLike extends Expression with NonSQLExpression {
 
     val resultIsNull = if (needNullCheck) {
       val resultIsNull = ctx.freshName("resultIsNull")
-      ctx.addMutableState("boolean", resultIsNull, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, resultIsNull)
       resultIsNull
     } else {
       "false"
     }
     val argValues = arguments.map { e =>
       val argValue = ctx.freshName("argValue")
-      ctx.addMutableState(ctx.javaType(e.dataType), argValue, "")
+      ctx.addMutableState(ctx.javaType(e.dataType), argValue)
       argValue
     }
 
@@ -548,7 +548,7 @@ case class MapObjects private(
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val elementJavaType = ctx.javaType(loopVarDataType)
-    ctx.addMutableState(elementJavaType, loopValue, "")
+    ctx.addMutableState(elementJavaType, loopValue)
     val genInputData = inputData.genCode(ctx)
     val genFunction = lambdaFunction.genCode(ctx)
     val dataLength = ctx.freshName("dataLength")
@@ -644,7 +644,7 @@ case class MapObjects private(
     }
 
     val loopNullCheck = if (loopIsNull != "false") {
-      ctx.addMutableState("boolean", loopIsNull, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, loopIsNull)
       inputDataType match {
         case _: ArrayType => s"$loopIsNull = 
${genInputData.value}.isNullAt($loopIndex);"
         case _ => s"$loopIsNull = $loopValue == null;"
@@ -808,10 +808,10 @@ case class CatalystToExternalMap private(
 
     val mapType = inputDataType(inputData.dataType).asInstanceOf[MapType]
     val keyElementJavaType = ctx.javaType(mapType.keyType)
-    ctx.addMutableState(keyElementJavaType, keyLoopValue, "")
+    ctx.addMutableState(keyElementJavaType, keyLoopValue)
     val genKeyFunction = keyLambdaFunction.genCode(ctx)
     val valueElementJavaType = ctx.javaType(mapType.valueType)
-    ctx.addMutableState(valueElementJavaType, valueLoopValue, "")
+    ctx.addMutableState(valueElementJavaType, valueLoopValue)
     val genValueFunction = valueLambdaFunction.genCode(ctx)
     val genInputData = inputData.genCode(ctx)
     val dataLength = ctx.freshName("dataLength")
@@ -844,7 +844,7 @@ case class CatalystToExternalMap private(
     val genValueFunctionValue = genFunctionValue(valueLambdaFunction, 
genValueFunction)
 
     val valueLoopNullCheck = if (valueLoopIsNull != "false") {
-      ctx.addMutableState("boolean", valueLoopIsNull, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, valueLoopIsNull)
       s"$valueLoopIsNull = $valueArray.isNullAt($loopIndex);"
     } else {
       ""
@@ -994,8 +994,8 @@ case class ExternalMapToCatalyst private(
 
     val keyElementJavaType = ctx.javaType(keyType)
     val valueElementJavaType = ctx.javaType(valueType)
-    ctx.addMutableState(keyElementJavaType, key, "")
-    ctx.addMutableState(valueElementJavaType, value, "")
+    ctx.addMutableState(keyElementJavaType, key)
+    ctx.addMutableState(valueElementJavaType, value)
 
     val (defineEntries, defineKeyValue) = child.dataType match {
       case ObjectType(cls) if classOf[java.util.Map[_, 
_]].isAssignableFrom(cls) =>
@@ -1031,14 +1031,14 @@ case class ExternalMapToCatalyst private(
     }
 
     val keyNullCheck = if (keyIsNull != "false") {
-      ctx.addMutableState("boolean", keyIsNull, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, keyIsNull)
       s"$keyIsNull = $key == null;"
     } else {
       ""
     }
 
     val valueNullCheck = if (valueIsNull != "false") {
-      ctx.addMutableState("boolean", valueIsNull, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, valueIsNull)
       s"$valueIsNull = $value == null;"
     } else {
       ""
@@ -1106,7 +1106,7 @@ case class CreateExternalRow(children: Seq[Expression], 
schema: StructType)
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val rowClass = classOf[GenericRowWithSchema].getName
     val values = ctx.freshName("values")
-    ctx.addMutableState("Object[]", values, "")
+    ctx.addMutableState("Object[]", values)
 
     val childrenCodes = children.zipWithIndex.map { case (e, i) =>
       val eval = e.genCode(ctx)
@@ -1244,7 +1244,7 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
 
     val javaBeanInstance = ctx.freshName("javaBean")
     val beanInstanceJavaType = ctx.javaType(beanInstance.dataType)
-    ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "")
+    ctx.addMutableState(beanInstanceJavaType, javaBeanInstance)
 
     val initialize = setters.map {
       case (setterMethod, fieldValue) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 5d75c60..c0084af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -236,8 +236,8 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val valueGen = value.genCode(ctx)
     val listGen = list.map(_.genCode(ctx))
-    ctx.addMutableState("boolean", ev.value, "")
-    ctx.addMutableState("boolean", ev.isNull, "")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.value)
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
     val valueArg = ctx.freshName("valueArg")
     val listCode = listGen.map(x =>
       s"""
@@ -253,7 +253,7 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
        """)
     val listCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) {
       val args = ("InternalRow", ctx.INPUT_ROW) :: 
(ctx.javaType(value.dataType), valueArg) :: Nil
-      ctx.splitExpressions(listCode, "valueIn", args)
+      ctx.splitExpressions(expressions = listCode, funcName = "valueIn", 
arguments = args)
     } else {
       listCode.mkString("\n")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
index 9705176..b4aefe6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
@@ -79,7 +79,7 @@ case class Rand(child: Expression) extends RDG {
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val rngTerm = ctx.freshName("rng")
     val className = classOf[XORShiftRandom].getName
-    ctx.addMutableState(className, rngTerm, "")
+    ctx.addMutableState(className, rngTerm)
     ctx.addPartitionInitializationStatement(
       s"$rngTerm = new $className(${seed}L + partitionIndex);")
     ev.copy(code = s"""
@@ -114,7 +114,7 @@ case class Randn(child: Expression) extends RDG {
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val rngTerm = ctx.freshName("rng")
     val className = classOf[XORShiftRandom].getName
-    ctx.addMutableState(className, rngTerm, "")
+    ctx.addMutableState(className, rngTerm)
     ctx.addPartitionInitializationStatement(
       s"$rngTerm = new $className(${seed}L + partitionIndex);")
     ev.copy(code = s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 360dd84..1c599af 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -74,8 +74,10 @@ case class Concat(children: Seq[Expression]) extends 
Expression with ImplicitCas
       """
     }
     val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) {
-      ctx.splitExpressions(inputs, "valueConcat",
-        ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil)
+      ctx.splitExpressions(
+        expressions = inputs,
+        funcName = "valueConcat",
+        arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) 
:: Nil)
     } else {
       inputs.mkString("\n")
     }
@@ -155,8 +157,10 @@ case class ConcatWs(children: Seq[Expression])
         }
       }
       val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) {
-        ctx.splitExpressions(inputs, "valueConcatWs",
-          ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil)
+        ctx.splitExpressions(
+          expressions = inputs,
+          funcName = "valueConcatWs",
+          arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) 
:: Nil)
       } else {
         inputs.mkString("\n")
       }
@@ -205,27 +209,30 @@ case class ConcatWs(children: Seq[Expression])
       }.unzip
 
       val codes = ctx.splitExpressions(ctx.INPUT_ROW, evals.map(_.code))
-      val varargCounts = ctx.splitExpressions(varargCount, 
"varargCountsConcatWs",
-        ("InternalRow", ctx.INPUT_ROW) :: Nil,
-        "int",
-        { body =>
+      val varargCounts = ctx.splitExpressions(
+        expressions = varargCount,
+        funcName = "varargCountsConcatWs",
+        arguments = ("InternalRow", ctx.INPUT_ROW) :: Nil,
+        returnType = "int",
+        makeSplitFunction = body =>
           s"""
            int $varargNum = 0;
            $body
            return $varargNum;
-         """
-        },
-        _.mkString(s"$varargNum += ", s";\n$varargNum += ", ";"))
-      val varargBuilds = ctx.splitExpressions(varargBuild, 
"varargBuildsConcatWs",
-        ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String []", array) :: ("int", 
idxInVararg) :: Nil,
-        "int",
-        { body =>
+           """,
+        foldFunctions = _.mkString(s"$varargNum += ", s";\n$varargNum += ", 
";"))
+      val varargBuilds = ctx.splitExpressions(
+        expressions = varargBuild,
+        funcName = "varargBuildsConcatWs",
+        arguments =
+          ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String []", array) :: 
("int", idxInVararg) :: Nil,
+        returnType = "int",
+        makeSplitFunction = body =>
           s"""
            $body
            return $idxInVararg;
-         """
-        },
-        _.mkString(s"$idxInVararg = ", s";\n$idxInVararg = ", ";"))
+           """,
+        foldFunctions = _.mkString(s"$idxInVararg = ", s";\n$idxInVararg = ", 
";"))
       ev.copy(
         s"""
         $codes
@@ -2059,7 +2066,7 @@ case class FormatNumber(x: Expression, d: Expression)
       val numberFormat = ctx.freshName("numberFormat")
       val i = ctx.freshName("i")
       val dFormat = ctx.freshName("dFormat")
-      ctx.addMutableState("int", lastDValue, s"$lastDValue = -100;")
+      ctx.addMutableState(ctx.JAVA_INT, lastDValue, s"$lastDValue = -100;")
       ctx.addMutableState(sb, pattern, s"$pattern = new $sb();")
       ctx.addMutableState(df, numberFormat,
       s"""$numberFormat = new $df("", new $dfs($l.$usLocale));""")

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 1925bad..a9bfb63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -76,14 +76,14 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport 
{
     val numOutputRows = metricTerm(ctx, "numOutputRows")
     val scanTimeMetric = metricTerm(ctx, "scanTime")
     val scanTimeTotalNs = ctx.freshName("scanTime")
-    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+    ctx.addMutableState(ctx.JAVA_LONG, scanTimeTotalNs, s"$scanTimeTotalNs = 
0;")
 
     val columnarBatchClz = classOf[ColumnarBatch].getName
     val batch = ctx.freshName("batch")
     ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
 
     val idx = ctx.freshName("batchIdx")
-    ctx.addMutableState("int", idx, s"$idx = 0;")
+    ctx.addMutableState(ctx.JAVA_INT, idx, s"$idx = 0;")
     val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
     val columnVectorClzs = vectorTypes.getOrElse(
       Seq.fill(colVars.size)(classOf[ColumnVector].getName))

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 21765cd..c0e2134 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -134,7 +134,7 @@ case class SortExec(
 
   override protected def doProduce(ctx: CodegenContext): String = {
     val needToSort = ctx.freshName("needToSort")
-    ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, needToSort, s"$needToSort = true;")
 
     // Initialize the class member variables. This includes the instance of 
the Sorter and
     // the iterator to return sorted rows.

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 51f7c9e..19c793e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -178,7 +178,7 @@ case class HashAggregateExec(
 
   private def doProduceWithoutKeys(ctx: CodegenContext): String = {
     val initAgg = ctx.freshName("initAgg")
-    ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, initAgg, s"$initAgg = false;")
 
     // generate variables for aggregation buffer
     val functions = 
aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
@@ -186,8 +186,8 @@ case class HashAggregateExec(
     bufVars = initExpr.map { e =>
       val isNull = ctx.freshName("bufIsNull")
       val value = ctx.freshName("bufValue")
-      ctx.addMutableState("boolean", isNull, "")
-      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
+      ctx.addMutableState(ctx.javaType(e.dataType), value)
       // The initial expression should not access any column
       val ev = e.genCode(ctx)
       val initVars = s"""
@@ -565,7 +565,7 @@ case class HashAggregateExec(
 
   private def doProduceWithKeys(ctx: CodegenContext): String = {
     val initAgg = ctx.freshName("initAgg")
-    ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, initAgg, s"$initAgg = false;")
     if (sqlContext.conf.enableTwoLevelAggMap) {
       enableTwoLevelHashMap(ctx)
     } else {
@@ -596,27 +596,27 @@ case class HashAggregateExec(
           s"$fastHashMapTerm = new $fastHashMapClassName();")
         ctx.addMutableState(
           
"java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarRow>",
-          iterTermForFastHashMap, "")
+          iterTermForFastHashMap)
       } else {
         ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
           s"$fastHashMapTerm = new $fastHashMapClassName(" +
             s"$thisPlan.getTaskMemoryManager(), 
$thisPlan.getEmptyAggregationBuffer());")
         ctx.addMutableState(
           "org.apache.spark.unsafe.KVIterator",
-          iterTermForFastHashMap, "")
+          iterTermForFastHashMap)
       }
     }
 
     // create hashMap
     hashMapTerm = ctx.freshName("hashMap")
     val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
-    ctx.addMutableState(hashMapClassName, hashMapTerm, "")
+    ctx.addMutableState(hashMapClassName, hashMapTerm)
     sorterTerm = ctx.freshName("sorter")
-    ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, sorterTerm, 
"")
+    ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, sorterTerm)
 
     // Create a name for iterator from HashMap
     val iterTerm = ctx.freshName("mapIter")
-    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, 
iterTerm, "")
+    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, 
iterTerm)
 
     def generateGenerateCode(): String = {
       if (isFastHashMapEnabled) {
@@ -774,7 +774,7 @@ case class HashAggregateExec(
     val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, 
resetCounter,
     incCounter) = if (testFallbackStartsAt.isDefined) {
       val countTerm = ctx.freshName("fallbackCounter")
-      ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
+      ctx.addMutableState(ctx.JAVA_INT, countTerm, s"$countTerm = 0;")
       (s"$countTerm < ${testFallbackStartsAt.get._1}",
         s"$countTerm < ${testFallbackStartsAt.get._2}", s"$countTerm = 0;", 
s"$countTerm += 1;")
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
index 90deb20..85b4529 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
@@ -48,8 +48,8 @@ abstract class HashMapGenerator(
     initExpr.map { e =>
       val isNull = ctx.freshName("bufIsNull")
       val value = ctx.freshName("bufValue")
-      ctx.addMutableState("boolean", isNull, "")
-      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
+      ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
+      ctx.addMutableState(ctx.javaType(e.dataType), value)
       val ev = e.genCode(ctx)
       val initVars =
         s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 3c7daa0..f205bdf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -368,9 +368,9 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
     val numOutput = metricTerm(ctx, "numOutputRows")
 
     val initTerm = ctx.freshName("initRange")
-    ctx.addMutableState("boolean", initTerm, s"$initTerm = false;")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, initTerm, s"$initTerm = false;")
     val number = ctx.freshName("number")
-    ctx.addMutableState("long", number, s"$number = 0L;")
+    ctx.addMutableState(ctx.JAVA_LONG, number, s"$number = 0L;")
 
     val value = ctx.freshName("value")
     val ev = ExprCode("", "false", value)
@@ -391,11 +391,11 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
     // Once number == batchEnd, it's time to progress to the next batch.
     val batchEnd = ctx.freshName("batchEnd")
-    ctx.addMutableState("long", batchEnd, s"$batchEnd = 0;")
+    ctx.addMutableState(ctx.JAVA_LONG, batchEnd, s"$batchEnd = 0;")
 
     // How many values should still be generated by this range operator.
     val numElementsTodo = ctx.freshName("numElementsTodo")
-    ctx.addMutableState("long", numElementsTodo, s"$numElementsTodo = 0L;")
+    ctx.addMutableState(ctx.JAVA_LONG, numElementsTodo, s"$numElementsTodo = 
0L;")
 
     // How many values should be generated in the next batch.
     val nextBatchTodo = ctx.freshName("nextBatchTodo")

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index ae600c1..ff5dd70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -89,7 +89,7 @@ object GenerateColumnAccessor extends 
CodeGenerator[Seq[DataType], ColumnarItera
         case array: ArrayType => classOf[ArrayColumnAccessor].getName
         case t: MapType => classOf[MapColumnAccessor].getName
       }
-      ctx.addMutableState(accessorCls, accessorName, "")
+      ctx.addMutableState(accessorCls, accessorName)
 
       val createCode = dt match {
         case t if ctx.isPrimitiveType(dt) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index cf7885f..9c08ec7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -423,7 +423,7 @@ case class SortMergeJoinExec(
   private def genScanner(ctx: CodegenContext): (String, String) = {
     // Create class member for next row from both sides.
     val leftRow = ctx.freshName("leftRow")
-    ctx.addMutableState("InternalRow", leftRow, "")
+    ctx.addMutableState("InternalRow", leftRow)
     val rightRow = ctx.freshName("rightRow")
     ctx.addMutableState("InternalRow", rightRow, s"$rightRow = null;")
 
@@ -519,10 +519,10 @@ case class SortMergeJoinExec(
       val value = ctx.freshName("value")
       val valueCode = ctx.getValue(leftRow, a.dataType, i.toString)
       // declare it as class member, so we can access the column before or in 
the loop.
-      ctx.addMutableState(ctx.javaType(a.dataType), value, "")
+      ctx.addMutableState(ctx.javaType(a.dataType), value)
       if (a.nullable) {
         val isNull = ctx.freshName("isNull")
-        ctx.addMutableState("boolean", isNull, "")
+        ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
         val code =
           s"""
              |$isNull = $leftRow.isNullAt($i);

http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 13da4b2..a8556f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -72,7 +72,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport 
{
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
     val stopEarly = ctx.freshName("stopEarly")
-    ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;")
+    ctx.addMutableState(ctx.JAVA_BOOLEAN, stopEarly, s"$stopEarly = false;")
 
     ctx.addNewFunction("stopEarly", s"""
       @Override
@@ -81,7 +81,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport 
{
       }
     """, inlineToOuterClass = true)
     val countTerm = ctx.freshName("count")
-    ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
+    ctx.addMutableState(ctx.JAVA_INT, countTerm, s"$countTerm = 0;")
     s"""
        | if ($countTerm < $limit) {
        |   $countTerm += 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to