Repository: spark Updated Branches: refs/heads/branch-2.1 0dc14f129 -> f67208369
[SPARK-18368] Fix regexp_replace with task serialization. ## What changes were proposed in this pull request? This makes the result value both transient and lazy, so that if the RegExpReplace object is initialized then serialized, `result: StringBuffer` will be correctly initialized. ## How was this patch tested? * Verified that this patch fixed the query that found the bug. * Added a test case that fails without the fix. Author: Ryan Blue <b...@apache.org> Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace. (cherry picked from commit b9192bb3ffc319ebee7dbd15c24656795e454749) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6720836 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6720836 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6720836 Branch: refs/heads/branch-2.1 Commit: f672083693c2c4dfea6dc43c024993d4561b1e79 Parents: 0dc14f1 Author: Ryan Blue <b...@apache.org> Authored: Tue Nov 8 23:47:48 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Nov 8 23:47:56 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/expressions/regexpExpressions.scala | 2 +- .../catalyst/expressions/ExpressionEvalHelper.scala | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6720836/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index 5648ad6..4896a62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -230,7 +230,7 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio @transient private var lastReplacement: String = _ @transient private var lastReplacementInUTF8: UTF8String = _ // result buffer write by Matcher - @transient private val result: StringBuffer = new StringBuffer + @transient private lazy val result: StringBuffer = new StringBuffer override def nullSafeEval(s: Any, p: Any, r: Any): Any = { if (!p.equals(lastRegex)) { http://git-wip-us.apache.org/repos/asf/spark/blob/f6720836/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 9ceb709..f836504 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -22,7 +22,8 @@ import org.scalactic.TripleEqualsSupport.Spread import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.GeneratorDrivenPropertyChecks -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer @@ -43,13 +44,15 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { protected def checkEvaluation( expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { + val serializer = new JavaSerializer(new SparkConf()).newInstance + val expr: Expression = serializer.deserialize(serializer.serialize(expression)) val catalystValue = CatalystTypeConverters.convertToCatalyst(expected) - checkEvaluationWithoutCodegen(expression, catalystValue, inputRow) - checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow) - if (GenerateUnsafeProjection.canSupport(expression.dataType)) { - checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow) + checkEvaluationWithoutCodegen(expr, catalystValue, inputRow) + checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow) + if (GenerateUnsafeProjection.canSupport(expr.dataType)) { + checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow) } - checkEvaluationWithOptimization(expression, catalystValue, inputRow) + checkEvaluationWithOptimization(expr, catalystValue, inputRow) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org