Repository: spark
Updated Branches:
refs/heads/master b32bd005e -> fd8c931a3
[SPARK-19104][SQL] Lambda variables in ExternalMapToCatalyst should be global
## What changes were proposed in this pull request?
The issue happens in `ExternalMapToCatalyst`. For example, the following codes
create `ExternalMapToCatalyst` to convert Scala Map to catalyst map format.
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" ->
InnerData("name", i + 100))))
val ds = spark.createDataset(data)
The `valueConverter` in `ExternalMapToCatalyst` looks like:
if (isnull(lambdavariable(ExternalMapToCatalyst_value52,
ExternalMapToCatalyst_value_isNull52, ObjectType(class
org.apache.spark.sql.InnerData), true))) null else named_struct(name,
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52,
ExternalMapToCatalyst_value_isNull52, ObjectType(class
org.apache.spark.sql.InnerData), true)).name, true), value,
assertnotnull(lambdavariable(ExternalMapToCatalyst_value52,
ExternalMapToCatalyst_value_isNull52, ObjectType(class
org.apache.spark.sql.InnerData), true)).value)
There is a `CreateNamedStruct` expression (`named_struct`) to create a row of
`InnerData.name` and `InnerData.value` that are referred by
`ExternalMapToCatalyst_value52`.
Because `ExternalMapToCatalyst_value52` are local variable, when
`CreateNamedStruct` splits expressions to individual functions, the local
variable can't be accessed anymore.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <[email protected]>
Closes #18418 from viirya/SPARK-19104.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd8c931a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd8c931a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd8c931a
Branch: refs/heads/master
Commit: fd8c931a30a084ee981b75aa469fc97dda6cfaa9
Parents: b32bd00
Author: Liang-Chi Hsieh <[email protected]>
Authored: Wed Jun 28 00:57:05 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 28 00:57:05 2017 +0800
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 18 ++++++++++++------
.../apache/spark/sql/DatasetPrimitiveSuite.scala | 8 ++++++++
2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fd8c931a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 073993c..4b65183 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -911,6 +911,12 @@ case class ExternalMapToCatalyst private(
val entry = ctx.freshName("entry")
val entries = ctx.freshName("entries")
+ val keyElementJavaType = ctx.javaType(keyType)
+ val valueElementJavaType = ctx.javaType(valueType)
+ ctx.addMutableState(keyElementJavaType, key, "")
+ ctx.addMutableState("boolean", valueIsNull, "")
+ ctx.addMutableState(valueElementJavaType, value, "")
+
val (defineEntries, defineKeyValue) = child.dataType match {
case ObjectType(cls) if classOf[java.util.Map[_,
_]].isAssignableFrom(cls) =>
val javaIteratorCls = classOf[java.util.Iterator[_]].getName
@@ -922,8 +928,8 @@ case class ExternalMapToCatalyst private(
val defineKeyValue =
s"""
final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
- ${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)})
$entry.getKey();
- ${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)})
$entry.getValue();
+ $key = (${ctx.boxedType(keyType)}) $entry.getKey();
+ $value = (${ctx.boxedType(valueType)}) $entry.getValue();
"""
defineEntries -> defineKeyValue
@@ -937,17 +943,17 @@ case class ExternalMapToCatalyst private(
val defineKeyValue =
s"""
final $scalaMapEntryCls $entry = ($scalaMapEntryCls)
$entries.next();
- ${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)})
$entry._1();
- ${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)})
$entry._2();
+ $key = (${ctx.boxedType(keyType)}) $entry._1();
+ $value = (${ctx.boxedType(valueType)}) $entry._2();
"""
defineEntries -> defineKeyValue
}
val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
- s"boolean $valueIsNull = false;"
+ s"$valueIsNull = false;"
} else {
- s"boolean $valueIsNull = $value == null;"
+ s"$valueIsNull = $value == null;"
}
val arrayCls = classOf[GenericArrayData].getName
http://git-wip-us.apache.org/repos/asf/spark/blob/fd8c931a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
index 4126660..a6847dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
@@ -39,6 +39,9 @@ case class ComplexClass(seq: SeqClass, list: ListClass,
queue: QueueClass)
case class ComplexMapClass(map: MapClass, lhmap: LHMapClass)
+case class InnerData(name: String, value: Int)
+case class NestedData(id: Int, param: Map[String, InnerData])
+
package object packageobject {
case class PackageClass(value: Int)
}
@@ -354,4 +357,9 @@ class DatasetPrimitiveSuite extends QueryTest with
SharedSQLContext {
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
}
+ test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be
global") {
+ val data = Seq.tabulate(10)(i => NestedData(1, Map("key" ->
InnerData("name", i + 100))))
+ val ds = spark.createDataset(data)
+ checkDataset(ds, data: _*)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]