Repository: spark Updated Branches: refs/heads/master 9bdff0bcd -> 96e947ed6
[SPARK-22569][SQL] Clean usage of addMutableState and splitExpressions ## What changes were proposed in this pull request? This PR is to clean the usage of addMutableState and splitExpressions 1. replace hardcoded type string to ctx.JAVA_BOOLEAN etc. 2. create a default value of the initCode for ctx.addMutableStats 3. Use named arguments when calling `splitExpressions ` ## How was this patch tested? The existing test cases Author: gatorsmile <[email protected]> Closes #19790 from gatorsmile/codeClean. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96e947ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96e947ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96e947ed Branch: refs/heads/master Commit: 96e947ed6c945bdbe71c308112308489668f19ac Parents: 9bdff0b Author: gatorsmile <[email protected]> Authored: Tue Nov 21 13:48:09 2017 +0100 Committer: Wenchen Fan <[email protected]> Committed: Tue Nov 21 13:48:09 2017 +0100 ---------------------------------------------------------------------- .../expressions/MonotonicallyIncreasingID.scala | 4 +- .../catalyst/expressions/SparkPartitionID.scala | 2 +- .../sql/catalyst/expressions/arithmetic.scala | 8 ++-- .../expressions/codegen/CodeGenerator.scala | 22 +++++++--- .../codegen/GenerateMutableProjection.scala | 2 +- .../expressions/complexTypeCreator.scala | 2 +- .../spark/sql/catalyst/expressions/hash.scala | 6 +-- .../catalyst/expressions/nullExpressions.scala | 10 +++-- .../catalyst/expressions/objects/objects.scala | 26 +++++------ .../sql/catalyst/expressions/predicates.scala | 6 +-- .../expressions/randomExpressions.scala | 4 +- .../expressions/stringExpressions.scala | 45 +++++++++++--------- .../spark/sql/execution/ColumnarBatchScan.scala | 4 +- .../apache/spark/sql/execution/SortExec.scala | 2 +- .../execution/aggregate/HashAggregateExec.scala | 20 ++++----- .../execution/aggregate/HashMapGenerator.scala | 4 +- .../sql/execution/basicPhysicalOperators.scala | 8 ++-- .../columnar/GenerateColumnAccessor.scala | 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 6 +-- .../org/apache/spark/sql/execution/limit.scala | 4 +- 20 files changed, 104 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala index 84027b5..821d784 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -67,8 +67,8 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val countTerm = ctx.freshName("count") val partitionMaskTerm = ctx.freshName("partitionMask") - ctx.addMutableState(ctx.JAVA_LONG, countTerm, "") - ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, "") + ctx.addMutableState(ctx.JAVA_LONG, countTerm) + ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm) ctx.addPartitionInitializationStatement(s"$countTerm = 0L;") ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) partitionIndex) << 33;") http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala index 8db7efd..4fa18d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala @@ -44,7 +44,7 @@ case class SparkPartitionID() extends LeafExpression with Nondeterministic { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val idTerm = ctx.freshName("partitionId") - ctx.addMutableState(ctx.JAVA_INT, idTerm, "") + ctx.addMutableState(ctx.JAVA_INT, idTerm) ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;") ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", isNull = "false") } http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 72d5889..e5a1096 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -602,8 +602,8 @@ case class Least(children: Seq[Expression]) extends Expression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val evalChildren = children.map(_.genCode(ctx)) - ctx.addMutableState("boolean", ev.isNull, "") - ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) + ctx.addMutableState(ctx.javaType(dataType), ev.value) def updateEval(eval: ExprCode): String = { s""" ${eval.code} @@ -668,8 +668,8 @@ case class Greatest(children: Seq[Expression]) extends Expression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val evalChildren = children.map(_.genCode(ctx)) - ctx.addMutableState("boolean", ev.isNull, "") - ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) + ctx.addMutableState(ctx.javaType(dataType), ev.value) def updateEval(eval: ExprCode): String = { s""" ${eval.code} http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index e02f125..7861719 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -157,7 +157,19 @@ class CodegenContext { val mutableStates: mutable.ArrayBuffer[(String, String, String)] = mutable.ArrayBuffer.empty[(String, String, String)] - def addMutableState(javaType: String, variableName: String, initCode: String): Unit = { + /** + * Add a mutable state as a field to the generated class. c.f. the comments above. + * + * @param javaType Java type of the field. Note that short names can be used for some types, + * e.g. InternalRow, UnsafeRow, UnsafeArrayData, etc. Other types will have to + * specify the fully-qualified Java type name. See the code in doCompile() for + * the list of default imports available. + * Also, generic type arguments are accepted but ignored. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + * If left blank, the field will be default-initialized. + */ + def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = { mutableStates += ((javaType, variableName, initCode)) } @@ -191,7 +203,7 @@ class CodegenContext { val initCodes = mutableStates.distinct.map(_._3 + "\n") // The generated initialization code may exceed 64kb function size limit in JVM if there are too // many mutable states, so split it into multiple functions. - splitExpressions(initCodes, "init", Nil) + splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil) } /** @@ -769,7 +781,7 @@ class CodegenContext { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } - splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil) + splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } /** @@ -931,7 +943,7 @@ class CodegenContext { dataType: DataType, baseFuncName: String): (String, String, String) = { val globalIsNull = freshName("isNull") - addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") + addMutableState(JAVA_BOOLEAN, globalIsNull, s"$globalIsNull = false;") val globalValue = freshName("value") addMutableState(javaType(dataType), globalValue, s"$globalValue = ${defaultValue(dataType)};") @@ -1038,7 +1050,7 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - addMutableState("boolean", isNull, s"$isNull = false;") + addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") addMutableState(javaType(expr.dataType), value, s"$value = ${defaultValue(expr.dataType)};") http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index b5429fa..802e8bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -63,7 +63,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP if (e.nullable) { val isNull = s"isNull_$i" val value = s"value_$i" - ctx.addMutableState("boolean", isNull, s"$isNull = true;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull, s"$isNull = true;") ctx.addMutableState(ctx.javaType(e.dataType), value, s"$value = ${ctx.defaultValue(e.dataType)};") s""" http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index 4b6574a..2a00d57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -120,7 +120,7 @@ private [sql] object GenArrayData { UnsafeArrayData.calculateHeaderPortionInBytes(numElements) + ByteArrayMethods.roundNumberOfBytesToNearestWord(elementType.defaultSize * numElements) val baseOffset = Platform.BYTE_ARRAY_OFFSET - ctx.addMutableState("UnsafeArrayData", arrayDataName, "") + ctx.addMutableState("UnsafeArrayData", arrayDataName) val primitiveValueTypeName = ctx.primitiveTypeName(elementType) val assignments = elementsCode.zipWithIndex.map { case (eval, i) => http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index eb3c49f..9e0786e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -277,7 +277,7 @@ abstract class HashExpression[E] extends Expression { } }) - ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + ctx.addMutableState(ctx.javaType(dataType), ev.value) ev.copy(code = s""" ${ev.value} = $seed; $childrenHash""") @@ -616,8 +616,8 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { s"\n$childHash = 0;" }) - ctx.addMutableState(ctx.javaType(dataType), ev.value, "") - ctx.addMutableState("int", childHash, s"$childHash = 0;") + ctx.addMutableState(ctx.javaType(dataType), ev.value) + ctx.addMutableState(ctx.JAVA_INT, childHash, s"$childHash = 0;") ev.copy(code = s""" ${ev.value} = $seed; $childrenHash""") http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala index 4aeab2c..5eaf3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala @@ -72,8 +72,8 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - ctx.addMutableState("boolean", ev.isNull, "") - ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) + ctx.addMutableState(ctx.javaType(dataType), ev.value) val evals = children.map { e => val eval = e.genCode(ctx) @@ -385,8 +385,10 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate val code = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { evals.mkString("\n") } else { - ctx.splitExpressions(evals, "atLeastNNonNulls", - ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil, + ctx.splitExpressions( + expressions = evals, + funcName = "atLeastNNonNulls", + arguments = ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil, returnType = "int", makeSplitFunction = { body => s""" http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index f2eee99..006d37f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -63,14 +63,14 @@ trait InvokeLike extends Expression with NonSQLExpression { val resultIsNull = if (needNullCheck) { val resultIsNull = ctx.freshName("resultIsNull") - ctx.addMutableState("boolean", resultIsNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, resultIsNull) resultIsNull } else { "false" } val argValues = arguments.map { e => val argValue = ctx.freshName("argValue") - ctx.addMutableState(ctx.javaType(e.dataType), argValue, "") + ctx.addMutableState(ctx.javaType(e.dataType), argValue) argValue } @@ -548,7 +548,7 @@ case class MapObjects private( override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val elementJavaType = ctx.javaType(loopVarDataType) - ctx.addMutableState(elementJavaType, loopValue, "") + ctx.addMutableState(elementJavaType, loopValue) val genInputData = inputData.genCode(ctx) val genFunction = lambdaFunction.genCode(ctx) val dataLength = ctx.freshName("dataLength") @@ -644,7 +644,7 @@ case class MapObjects private( } val loopNullCheck = if (loopIsNull != "false") { - ctx.addMutableState("boolean", loopIsNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, loopIsNull) inputDataType match { case _: ArrayType => s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);" case _ => s"$loopIsNull = $loopValue == null;" @@ -808,10 +808,10 @@ case class CatalystToExternalMap private( val mapType = inputDataType(inputData.dataType).asInstanceOf[MapType] val keyElementJavaType = ctx.javaType(mapType.keyType) - ctx.addMutableState(keyElementJavaType, keyLoopValue, "") + ctx.addMutableState(keyElementJavaType, keyLoopValue) val genKeyFunction = keyLambdaFunction.genCode(ctx) val valueElementJavaType = ctx.javaType(mapType.valueType) - ctx.addMutableState(valueElementJavaType, valueLoopValue, "") + ctx.addMutableState(valueElementJavaType, valueLoopValue) val genValueFunction = valueLambdaFunction.genCode(ctx) val genInputData = inputData.genCode(ctx) val dataLength = ctx.freshName("dataLength") @@ -844,7 +844,7 @@ case class CatalystToExternalMap private( val genValueFunctionValue = genFunctionValue(valueLambdaFunction, genValueFunction) val valueLoopNullCheck = if (valueLoopIsNull != "false") { - ctx.addMutableState("boolean", valueLoopIsNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, valueLoopIsNull) s"$valueLoopIsNull = $valueArray.isNullAt($loopIndex);" } else { "" @@ -994,8 +994,8 @@ case class ExternalMapToCatalyst private( val keyElementJavaType = ctx.javaType(keyType) val valueElementJavaType = ctx.javaType(valueType) - ctx.addMutableState(keyElementJavaType, key, "") - ctx.addMutableState(valueElementJavaType, value, "") + ctx.addMutableState(keyElementJavaType, key) + ctx.addMutableState(valueElementJavaType, value) val (defineEntries, defineKeyValue) = child.dataType match { case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) => @@ -1031,14 +1031,14 @@ case class ExternalMapToCatalyst private( } val keyNullCheck = if (keyIsNull != "false") { - ctx.addMutableState("boolean", keyIsNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, keyIsNull) s"$keyIsNull = $key == null;" } else { "" } val valueNullCheck = if (valueIsNull != "false") { - ctx.addMutableState("boolean", valueIsNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, valueIsNull) s"$valueIsNull = $value == null;" } else { "" @@ -1106,7 +1106,7 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val rowClass = classOf[GenericRowWithSchema].getName val values = ctx.freshName("values") - ctx.addMutableState("Object[]", values, "") + ctx.addMutableState("Object[]", values) val childrenCodes = children.zipWithIndex.map { case (e, i) => val eval = e.genCode(ctx) @@ -1244,7 +1244,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp val javaBeanInstance = ctx.freshName("javaBean") val beanInstanceJavaType = ctx.javaType(beanInstance.dataType) - ctx.addMutableState(beanInstanceJavaType, javaBeanInstance, "") + ctx.addMutableState(beanInstanceJavaType, javaBeanInstance) val initialize = setters.map { case (setterMethod, fieldValue) => http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 5d75c60..c0084af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -236,8 +236,8 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val valueGen = value.genCode(ctx) val listGen = list.map(_.genCode(ctx)) - ctx.addMutableState("boolean", ev.value, "") - ctx.addMutableState("boolean", ev.isNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.value) + ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) val valueArg = ctx.freshName("valueArg") val listCode = listGen.map(x => s""" @@ -253,7 +253,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { """) val listCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { val args = ("InternalRow", ctx.INPUT_ROW) :: (ctx.javaType(value.dataType), valueArg) :: Nil - ctx.splitExpressions(listCode, "valueIn", args) + ctx.splitExpressions(expressions = listCode, funcName = "valueIn", arguments = args) } else { listCode.mkString("\n") } http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala index 9705176..b4aefe6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala @@ -79,7 +79,7 @@ case class Rand(child: Expression) extends RDG { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val rngTerm = ctx.freshName("rng") val className = classOf[XORShiftRandom].getName - ctx.addMutableState(className, rngTerm, "") + ctx.addMutableState(className, rngTerm) ctx.addPartitionInitializationStatement( s"$rngTerm = new $className(${seed}L + partitionIndex);") ev.copy(code = s""" @@ -114,7 +114,7 @@ case class Randn(child: Expression) extends RDG { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val rngTerm = ctx.freshName("rng") val className = classOf[XORShiftRandom].getName - ctx.addMutableState(className, rngTerm, "") + ctx.addMutableState(className, rngTerm) ctx.addPartitionInitializationStatement( s"$rngTerm = new $className(${seed}L + partitionIndex);") ev.copy(code = s""" http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 360dd84..1c599af 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -74,8 +74,10 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas """ } val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - ctx.splitExpressions(inputs, "valueConcat", - ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) + ctx.splitExpressions( + expressions = inputs, + funcName = "valueConcat", + arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) } else { inputs.mkString("\n") } @@ -155,8 +157,10 @@ case class ConcatWs(children: Seq[Expression]) } } val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - ctx.splitExpressions(inputs, "valueConcatWs", - ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) + ctx.splitExpressions( + expressions = inputs, + funcName = "valueConcatWs", + arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) } else { inputs.mkString("\n") } @@ -205,27 +209,30 @@ case class ConcatWs(children: Seq[Expression]) }.unzip val codes = ctx.splitExpressions(ctx.INPUT_ROW, evals.map(_.code)) - val varargCounts = ctx.splitExpressions(varargCount, "varargCountsConcatWs", - ("InternalRow", ctx.INPUT_ROW) :: Nil, - "int", - { body => + val varargCounts = ctx.splitExpressions( + expressions = varargCount, + funcName = "varargCountsConcatWs", + arguments = ("InternalRow", ctx.INPUT_ROW) :: Nil, + returnType = "int", + makeSplitFunction = body => s""" int $varargNum = 0; $body return $varargNum; - """ - }, - _.mkString(s"$varargNum += ", s";\n$varargNum += ", ";")) - val varargBuilds = ctx.splitExpressions(varargBuild, "varargBuildsConcatWs", - ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String []", array) :: ("int", idxInVararg) :: Nil, - "int", - { body => + """, + foldFunctions = _.mkString(s"$varargNum += ", s";\n$varargNum += ", ";")) + val varargBuilds = ctx.splitExpressions( + expressions = varargBuild, + funcName = "varargBuildsConcatWs", + arguments = + ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String []", array) :: ("int", idxInVararg) :: Nil, + returnType = "int", + makeSplitFunction = body => s""" $body return $idxInVararg; - """ - }, - _.mkString(s"$idxInVararg = ", s";\n$idxInVararg = ", ";")) + """, + foldFunctions = _.mkString(s"$idxInVararg = ", s";\n$idxInVararg = ", ";")) ev.copy( s""" $codes @@ -2059,7 +2066,7 @@ case class FormatNumber(x: Expression, d: Expression) val numberFormat = ctx.freshName("numberFormat") val i = ctx.freshName("i") val dFormat = ctx.freshName("dFormat") - ctx.addMutableState("int", lastDValue, s"$lastDValue = -100;") + ctx.addMutableState(ctx.JAVA_INT, lastDValue, s"$lastDValue = -100;") ctx.addMutableState(sb, pattern, s"$pattern = new $sb();") ctx.addMutableState(df, numberFormat, s"""$numberFormat = new $df("", new $dfs($l.$usLocale));""") http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 1925bad..a9bfb63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -76,14 +76,14 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val numOutputRows = metricTerm(ctx, "numOutputRows") val scanTimeMetric = metricTerm(ctx, "scanTime") val scanTimeTotalNs = ctx.freshName("scanTime") - ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + ctx.addMutableState(ctx.JAVA_LONG, scanTimeTotalNs, s"$scanTimeTotalNs = 0;") val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") val idx = ctx.freshName("batchIdx") - ctx.addMutableState("int", idx, s"$idx = 0;") + ctx.addMutableState(ctx.JAVA_INT, idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnVectorClzs = vectorTypes.getOrElse( Seq.fill(colVars.size)(classOf[ColumnVector].getName)) http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 21765cd..c0e2134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -134,7 +134,7 @@ case class SortExec( override protected def doProduce(ctx: CodegenContext): String = { val needToSort = ctx.freshName("needToSort") - ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, needToSort, s"$needToSort = true;") // Initialize the class member variables. This includes the instance of the Sorter and // the iterator to return sorted rows. http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 51f7c9e..19c793e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -178,7 +178,7 @@ case class HashAggregateExec( private def doProduceWithoutKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") - ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, initAgg, s"$initAgg = false;") // generate variables for aggregation buffer val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) @@ -186,8 +186,8 @@ case class HashAggregateExec( bufVars = initExpr.map { e => val isNull = ctx.freshName("bufIsNull") val value = ctx.freshName("bufValue") - ctx.addMutableState("boolean", isNull, "") - ctx.addMutableState(ctx.javaType(e.dataType), value, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) + ctx.addMutableState(ctx.javaType(e.dataType), value) // The initial expression should not access any column val ev = e.genCode(ctx) val initVars = s""" @@ -565,7 +565,7 @@ case class HashAggregateExec( private def doProduceWithKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") - ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, initAgg, s"$initAgg = false;") if (sqlContext.conf.enableTwoLevelAggMap) { enableTwoLevelHashMap(ctx) } else { @@ -596,27 +596,27 @@ case class HashAggregateExec( s"$fastHashMapTerm = new $fastHashMapClassName();") ctx.addMutableState( "java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarRow>", - iterTermForFastHashMap, "") + iterTermForFastHashMap) } else { ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName(" + s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());") ctx.addMutableState( "org.apache.spark.unsafe.KVIterator", - iterTermForFastHashMap, "") + iterTermForFastHashMap) } } // create hashMap hashMapTerm = ctx.freshName("hashMap") val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName - ctx.addMutableState(hashMapClassName, hashMapTerm, "") + ctx.addMutableState(hashMapClassName, hashMapTerm) sorterTerm = ctx.freshName("sorter") - ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, sorterTerm, "") + ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, sorterTerm) // Create a name for iterator from HashMap val iterTerm = ctx.freshName("mapIter") - ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "") + ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm) def generateGenerateCode(): String = { if (isFastHashMapEnabled) { @@ -774,7 +774,7 @@ case class HashAggregateExec( val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.freshName("fallbackCounter") - ctx.addMutableState("int", countTerm, s"$countTerm = 0;") + ctx.addMutableState(ctx.JAVA_INT, countTerm, s"$countTerm = 0;") (s"$countTerm < ${testFallbackStartsAt.get._1}", s"$countTerm < ${testFallbackStartsAt.get._2}", s"$countTerm = 0;", s"$countTerm += 1;") } else { http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala index 90deb20..85b4529 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala @@ -48,8 +48,8 @@ abstract class HashMapGenerator( initExpr.map { e => val isNull = ctx.freshName("bufIsNull") val value = ctx.freshName("bufValue") - ctx.addMutableState("boolean", isNull, "") - ctx.addMutableState(ctx.javaType(e.dataType), value, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) + ctx.addMutableState(ctx.javaType(e.dataType), value) val ev = e.genCode(ctx) val initVars = s""" http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 3c7daa0..f205bdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -368,9 +368,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val numOutput = metricTerm(ctx, "numOutputRows") val initTerm = ctx.freshName("initRange") - ctx.addMutableState("boolean", initTerm, s"$initTerm = false;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, initTerm, s"$initTerm = false;") val number = ctx.freshName("number") - ctx.addMutableState("long", number, s"$number = 0L;") + ctx.addMutableState(ctx.JAVA_LONG, number, s"$number = 0L;") val value = ctx.freshName("value") val ev = ExprCode("", "false", value) @@ -391,11 +391,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // Once number == batchEnd, it's time to progress to the next batch. val batchEnd = ctx.freshName("batchEnd") - ctx.addMutableState("long", batchEnd, s"$batchEnd = 0;") + ctx.addMutableState(ctx.JAVA_LONG, batchEnd, s"$batchEnd = 0;") // How many values should still be generated by this range operator. val numElementsTodo = ctx.freshName("numElementsTodo") - ctx.addMutableState("long", numElementsTodo, s"$numElementsTodo = 0L;") + ctx.addMutableState(ctx.JAVA_LONG, numElementsTodo, s"$numElementsTodo = 0L;") // How many values should be generated in the next batch. val nextBatchTodo = ctx.freshName("nextBatchTodo") http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index ae600c1..ff5dd70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -89,7 +89,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case array: ArrayType => classOf[ArrayColumnAccessor].getName case t: MapType => classOf[MapColumnAccessor].getName } - ctx.addMutableState(accessorCls, accessorName, "") + ctx.addMutableState(accessorCls, accessorName) val createCode = dt match { case t if ctx.isPrimitiveType(dt) => http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index cf7885f..9c08ec7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -423,7 +423,7 @@ case class SortMergeJoinExec( private def genScanner(ctx: CodegenContext): (String, String) = { // Create class member for next row from both sides. val leftRow = ctx.freshName("leftRow") - ctx.addMutableState("InternalRow", leftRow, "") + ctx.addMutableState("InternalRow", leftRow) val rightRow = ctx.freshName("rightRow") ctx.addMutableState("InternalRow", rightRow, s"$rightRow = null;") @@ -519,10 +519,10 @@ case class SortMergeJoinExec( val value = ctx.freshName("value") val valueCode = ctx.getValue(leftRow, a.dataType, i.toString) // declare it as class member, so we can access the column before or in the loop. - ctx.addMutableState(ctx.javaType(a.dataType), value, "") + ctx.addMutableState(ctx.javaType(a.dataType), value) if (a.nullable) { val isNull = ctx.freshName("isNull") - ctx.addMutableState("boolean", isNull, "") + ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) val code = s""" |$isNull = $leftRow.isNullAt($i); http://git-wip-us.apache.org/repos/asf/spark/blob/96e947ed/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 13da4b2..a8556f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -72,7 +72,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { val stopEarly = ctx.freshName("stopEarly") - ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") + ctx.addMutableState(ctx.JAVA_BOOLEAN, stopEarly, s"$stopEarly = false;") ctx.addNewFunction("stopEarly", s""" @Override @@ -81,7 +81,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } """, inlineToOuterClass = true) val countTerm = ctx.freshName("count") - ctx.addMutableState("int", countTerm, s"$countTerm = 0;") + ctx.addMutableState(ctx.JAVA_INT, countTerm, s"$countTerm = 0;") s""" | if ($countTerm < $limit) { | $countTerm += 1; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
