[
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102482#comment-15102482
]
Muthu Jayakumar commented on SPARK-12783:
-----------------------------------------
Hello Kevin,
Here is what I am seeing...
from shell:
{code}
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
scala> case class MyMap(map: Map[String, String])
defined class MyMap
scala> :paste
// Entering paste mode (ctrl-D to finish)
case class TestCaseClass(a: String, b: String){
def toMyMap: MyMap = {
MyMap(Map(a->b))
}
def toStr: String = {
a
}
}
// Exiting paste mode, now interpreting.
defined class TestCaseClass
scala> TestCaseClass("a", "nn")
res4: TestCaseClass = TestCaseClass(a,nn)
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01",
"data1"), TestCaseClass("2015-05-01", "data2"))).toDF()
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner
class `TestCaseClass` without access to the scope that this class was defined
in. Try moving this class out of its parent class.;
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:260)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:78)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:89)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:507)
... 52 elided
{code}
I do remember seeing the above error stack, if the case class was defined
inside the scope of an object (For example: If defined inside MyApp like in the
example below as it becomes an inner class)
>From code, I added an explicit import and eventually changed to use fully
>qualified class names like below...
{code}
import scala.collection.{Map => ImMap}
case class MyMap(map: ImMap[String, String])
case class TestCaseClass(a: String, b: String){
def toMyMap: MyMap = {
MyMap(ImMap(a->b))
}
def toStr: String = {
a
}
}
object MyApp extends App {
//Get handle to contexts...
import sqlContext.implicits._
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"),
TestCaseClass("2015-05-01", "data2"))).toDF()
df1.as[TestCaseClass].map(_.toStr).show() //works fine
df1.as[TestCaseClass].map(_.toMyMap).show() //error
}
{code}
and
{code}
case class MyMap(map: scala.collection.Map[String, String])
case class TestCaseClass(a: String, b: String){
def toMyMap: MyMap = {
MyMap(scala.collection.Map(a->b))
}
def toStr: String = {
a
}
}
object MyApp extends App {
//Get handle to contexts...
import sqlContext.implicits._
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"),
TestCaseClass("2015-05-01", "data2"))).toDF()
df1.as[TestCaseClass].map(_.toStr).show() //works fine
df1.as[TestCaseClass].map(_.toMyMap).show() //error
}
{code}
Please advice on what I may be missing. I misread the earlier comment and tried
to use immutable map incorrectly :(.
> Dataset map serialization error
> -------------------------------
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Muthu Jayakumar
> Assignee: Wenchen Fan
> Priority: Critical
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
> def toMyMap: MyMap = {
> MyMap(Map(a->b))
> }
> def toStr: String = {
> a
> }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"),
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caused by: java.io.NotSerializableException:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack:
> - object not serializable (class:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value:
> package lang)
> - field (class: scala.reflect.internal.Types$ThisType, name: sym, type:
> class scala.reflect.internal.Symbols$Symbol)
> - object (class scala.reflect.internal.Types$UniqueThisType,
> java.lang.type)
> - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type:
> class scala.reflect.internal.Types$Type)
> - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
> - field (class: scala.reflect.internal.Types$TypeRef, name: normalized,
> type: class scala.reflect.internal.Types$Type)
> - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
> - field (class:
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
> type: class scala.reflect.api.Types$TypeApi)
> - object (class
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, <function1>)
> - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects,
> name: function, type: interface scala.Function1)
> - object (class org.apache.spark.sql.catalyst.expressions.MapObjects,
> mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
> - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
> targetObject, type: class
> org.apache.spark.sql.catalyst.expressions.Expression)
> - object (class org.apache.spark.sql.catalyst.expressions.Invoke,
> invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)))
> - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@4c7e3aab)
> - writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.$colon$colon,
> List(invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),
> invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;))))
> - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke,
> name: arguments, type: interface scala.collection.Seq)
> - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke,
> staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>
> [Ljava.lang.Object;)),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true))
> - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@78d9820)
> - writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.$colon$colon,
> List(staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>
> [Ljava.lang.Object;)),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true)))
> - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance,
> name: arguments, type: interface scala.collection.Seq)
> - object (class org.apache.spark.sql.catalyst.expressions.NewInstance,
> newinstance(class collector.MyMap,staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>
> [Ljava.lang.Object;)),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None))
> - field (class:
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name:
> fromRowExpression, type: class
> org.apache.spark.sql.catalyst.expressions.Expression)
> - object (class
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
> class[map#ExprId(9,5d198984-4022-43b2-a2a3-ddbb214ba0ef): map<string,string>])
> - field (class: org.apache.spark.sql.execution.MapPartitions, name:
> uEncoder, type: class
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
> - object (class org.apache.spark.sql.execution.MapPartitions,
> !MapPartitions <function1>, class[a[0]: string, b[0]: string],
> class[map#ExprId(9,5d198984-4022-43b2-a2a3-ddbb214ba0ef):
> map<string,string>], [map#13]
> +- LocalTableScan [a#2,b#3],
> [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]]
> )
> - field (class:
> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, type:
> class org.apache.spark.sql.execution.MapPartitions)
> - object (class
> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>)
> - field (class:
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type:
> interface scala.Function1)
> - object (class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
> - field (class:
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
> name: $outer, type: class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
> - object (class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
> <function3>)
> - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
> interface scala.Function3)
> - object (class org.apache.spark.rdd.MapPartitionsRDD,
> MapPartitionsRDD[1] at show at CollectorSparkTest.scala:50)
> - field (class: org.apache.spark.NarrowDependency, name: _rdd, type:
> class org.apache.spark.rdd.RDD)
> - object (class org.apache.spark.OneToOneDependency,
> org.apache.spark.OneToOneDependency@4d60c27d)
> - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@6c436651)
> - writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.$colon$colon,
> List(org.apache.spark.OneToOneDependency@4d60c27d))
> - field (class: org.apache.spark.rdd.RDD, name:
> org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
> - object (class org.apache.spark.rdd.MapPartitionsRDD,
> MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50)
> - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
> - object (class scala.Tuple2, (MapPartitionsRDD[2] at show at
> CollectorSparkTest.scala:50,<function2>))
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
> at
> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]