Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0cceb1bfe -> bdddc661b


[SPARK-18368] Fix regexp_replace with task serialization.

## What changes were proposed in this pull request?

This makes the result value both transient and lazy, so that if the 
RegExpReplace object is initialized then serialized, `result: StringBuffer` 
will be correctly initialized.

## How was this patch tested?

* Verified that this patch fixed the query that found the bug.
* Added a test case that fails without the fix.

Author: Ryan Blue <b...@apache.org>

Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace.

(cherry picked from commit b9192bb3ffc319ebee7dbd15c24656795e454749)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdddc661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdddc661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdddc661

Branch: refs/heads/branch-2.0
Commit: bdddc661b71725dce35c6b2edd9ccb22e774e997
Parents: 0cceb1b
Author: Ryan Blue <b...@apache.org>
Authored: Tue Nov 8 23:47:48 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 8 23:48:06 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/regexpExpressions.scala |  2 +-
 .../catalyst/expressions/ExpressionEvalHelper.scala  | 15 +++++++++------
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bdddc661/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index d25da3f..f6a55cf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -220,7 +220,7 @@ case class RegExpReplace(subject: Expression, regexp: 
Expression, rep: Expressio
   @transient private var lastReplacement: String = _
   @transient private var lastReplacementInUTF8: UTF8String = _
   // result buffer write by Matcher
-  @transient private val result: StringBuffer = new StringBuffer
+  @transient private lazy val result: StringBuffer = new StringBuffer
 
   override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
     if (!p.equals(lastRegex)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bdddc661/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 668543a..186079f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -21,7 +21,8 @@ import org.scalacheck.Gen
 import org.scalactic.TripleEqualsSupport.Spread
 import org.scalatest.prop.GeneratorDrivenPropertyChecks
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
@@ -42,13 +43,15 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
 
   protected def checkEvaluation(
       expression: => Expression, expected: Any, inputRow: InternalRow = 
EmptyRow): Unit = {
+    val serializer = new JavaSerializer(new SparkConf()).newInstance
+    val expr: Expression = 
serializer.deserialize(serializer.serialize(expression))
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
-    checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
-    checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, 
inputRow)
-    if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
-      checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
+    checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
+    checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, 
inputRow)
+    if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
+      checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow)
     }
-    checkEvaluationWithOptimization(expression, catalystValue, inputRow)
+    checkEvaluationWithOptimization(expr, catalystValue, inputRow)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to