Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0dc14f129 -> f67208369


[SPARK-18368] Fix regexp_replace with task serialization.

## What changes were proposed in this pull request?

This makes the result value both transient and lazy, so that if the 
RegExpReplace object is initialized then serialized, `result: StringBuffer` 
will be correctly initialized.

## How was this patch tested?

* Verified that this patch fixed the query that found the bug.
* Added a test case that fails without the fix.

Author: Ryan Blue <b...@apache.org>

Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace.

(cherry picked from commit b9192bb3ffc319ebee7dbd15c24656795e454749)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6720836
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6720836
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6720836

Branch: refs/heads/branch-2.1
Commit: f672083693c2c4dfea6dc43c024993d4561b1e79
Parents: 0dc14f1
Author: Ryan Blue <b...@apache.org>
Authored: Tue Nov 8 23:47:48 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 8 23:47:56 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/regexpExpressions.scala |  2 +-
 .../catalyst/expressions/ExpressionEvalHelper.scala  | 15 +++++++++------
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6720836/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index 5648ad6..4896a62 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -230,7 +230,7 @@ case class RegExpReplace(subject: Expression, regexp: 
Expression, rep: Expressio
   @transient private var lastReplacement: String = _
   @transient private var lastReplacementInUTF8: UTF8String = _
   // result buffer write by Matcher
-  @transient private val result: StringBuffer = new StringBuffer
+  @transient private lazy val result: StringBuffer = new StringBuffer
 
   override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
     if (!p.equals(lastRegex)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6720836/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 9ceb709..f836504 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -22,7 +22,8 @@ import org.scalactic.TripleEqualsSupport.Spread
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.GeneratorDrivenPropertyChecks
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
@@ -43,13 +44,15 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
 
   protected def checkEvaluation(
       expression: => Expression, expected: Any, inputRow: InternalRow = 
EmptyRow): Unit = {
+    val serializer = new JavaSerializer(new SparkConf()).newInstance
+    val expr: Expression = 
serializer.deserialize(serializer.serialize(expression))
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
-    checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
-    checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, 
inputRow)
-    if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
-      checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
+    checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
+    checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, 
inputRow)
+    if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
+      checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow)
     }
-    checkEvaluationWithOptimization(expression, catalystValue, inputRow)
+    checkEvaluationWithOptimization(expr, catalystValue, inputRow)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to