Repository: spark Updated Branches: refs/heads/branch-2.0 c1c498006 -> fb1c69714
[SPARK-17061][SPARK-17093][SQL] MapObjects` should make copies of unsafe-backed data Currently `MapObjects` does not make copies of unsafe-backed data, leading to problems like [SPARK-17061](https://issues.apache.org/jira/browse/SPARK-17061) [SPARK-17093](https://issues.apache.org/jira/browse/SPARK-17093). This patch makes `MapObjects` make copies of unsafe-backed data. Generated code - prior to this patch: ```java ... /* 295 */ if (isNull12) { /* 296 */ convertedArray1[loopIndex1] = null; /* 297 */ } else { /* 298 */ convertedArray1[loopIndex1] = value12; /* 299 */ } ... ``` Generated code - after this patch: ```java ... /* 295 */ if (isNull12) { /* 296 */ convertedArray1[loopIndex1] = null; /* 297 */ } else { /* 298 */ convertedArray1[loopIndex1] = value12 instanceof UnsafeRow? value12.copy() : value12; /* 299 */ } ... ``` Add a new test case which would fail without this patch. Author: Liwei Lin <[email protected]> Closes #14698 from lw-lin/mapobjects-copy. (cherry picked from commit e0b20f9f24d5c3304bf517a4dcfb0da93be5bc75) Signed-off-by: Herman van Hovell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1c6971 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1c6971 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1c6971 Branch: refs/heads/branch-2.0 Commit: fb1c697143a5bb2df69d9f2c9cbddc4eb526f047 Parents: c1c4980 Author: Liwei Lin <[email protected]> Authored: Thu Aug 25 11:24:40 2016 +0200 Committer: Herman van Hovell <[email protected]> Committed: Thu Aug 25 11:45:51 2016 +0200 ---------------------------------------------------------------------- .../maven_app_core/src/main/java/SimpleApp.java | 41 ++++++++++++++++++++ .../catalyst/expressions/objects/objects.scala | 12 +++++- .../expressions/ExpressionEvalHelper.scala | 2 +- 3 files changed, 53 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java ---------------------------------------------------------------------- diff --git a/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java b/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java index 5217689..059e51b 100644 --- a/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java +++ b/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java @@ -18,11 +18,20 @@ import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; +<<<<<<< HEAD:dev/audit-release/maven_app_core/src/main/java/SimpleApp.java public class SimpleApp { public static void main(String[] args) { String logFile = "input.txt"; JavaSparkContext sc = new JavaSparkContext("local", "Simple App"); JavaRDD<String> logData = sc.textFile(logFile).cache(); +======= +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types.{IntegerType, ObjectType} +>>>>>>> e0b20f9... [SPARK-17061][SPARK-17093][SQL] MapObjects` should make copies of unsafe-backed data:sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala long numAs = logData.filter(new Function<String, Boolean>() { public Boolean call(String s) { return s.contains("a"); } @@ -39,4 +48,36 @@ public class SimpleApp { System.out.println("Test succeeded"); sc.stop(); } + + test("MapObjects should make copies of unsafe-backed data") { + // test UnsafeRow-backed data + val structEncoder = ExpressionEncoder[Array[Tuple2[java.lang.Integer, java.lang.Integer]]] + val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4)))) + val structExpected = new GenericArrayData( + Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4)))) + checkEvalutionWithUnsafeProjection( + structEncoder.serializer.head, structExpected, structInputRow) + + // test UnsafeArray-backed data + val arrayEncoder = ExpressionEncoder[Array[Array[Int]]] + val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4)))) + val arrayExpected = new GenericArrayData( + Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4)))) + checkEvalutionWithUnsafeProjection( + arrayEncoder.serializer.head, arrayExpected, arrayInputRow) + + // test UnsafeMap-backed data + val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]] + val mapInputRow = InternalRow.fromSeq(Seq(Array( + Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400)))) + val mapExpected = new GenericArrayData(Seq( + new ArrayBasedMapData( + new GenericArrayData(Array(1, 2)), + new GenericArrayData(Array(100, 200))), + new ArrayBasedMapData( + new GenericArrayData(Array(3, 4)), + new GenericArrayData(Array(300, 400))))) + checkEvalutionWithUnsafeProjection( + mapEncoder.serializer.head, mapExpected, mapInputRow) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 37ec1a6..1cdda53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -481,6 +481,16 @@ case class MapObjects private( s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)" } + // Make a copy of the data if it's unsafe-backed + def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) = + s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value" + val genFunctionValue = lambdaFunction.dataType match { + case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value) + case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value) + case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value) + case _ => genFunction.value + } + val loopNullCheck = inputDataType match { case _: ArrayType => s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);" // The element of primitive array will never be null. @@ -508,7 +518,7 @@ case class MapObjects private( if (${genFunction.isNull}) { $convertedArray[$loopIndex] = null; } else { - $convertedArray[$loopIndex] = ${genFunction.value}; + $convertedArray[$loopIndex] = $genFunctionValue; } $loopIndex += 1; http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index d6a9672..668543a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { // some expression is reusing variable names across different instances. // This behavior is tested in ExpressionEvalHelperSuite. val plan = generateProject( - GenerateUnsafeProjection.generate( + UnsafeProjection.create( Alias(expression, s"Optimized($expression)1")() :: Alias(expression, s"Optimized($expression)2")() :: Nil), expression) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
