[ https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15160119#comment-15160119 ]
Jakob Odersky edited comment on SPARK-12878 at 2/25/16 10:22 PM: ----------------------------------------------------------------- I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual Bs are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge ([~marmbrus] [~cloud_fan]) on the topic can clarify what's going on? was (Author: jodersky): I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual Bs are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? > Dataframe fails with nested User Defined Types > ---------------------------------------------- > > Key: SPARK-12878 > URL: https://issues.apache.org/jira/browse/SPARK-12878 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0 > Reporter: Joao > Priority: Blocker > > Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. > In version 1.5.2 the code below worked just fine: > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.sql.catalyst.InternalRow > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow > import org.apache.spark.sql.types._ > @SQLUserDefinedType(udt = classOf[AUDT]) > case class A(list:Seq[B]) > class AUDT extends UserDefinedType[A] { > override def sqlType: DataType = StructType(Seq(StructField("list", > ArrayType(BUDT, containsNull = false), nullable = true))) > override def userClass: Class[A] = classOf[A] > override def serialize(obj: Any): Any = obj match { > case A(list) => > val row = new GenericMutableRow(1) > row.update(0, new > GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) > row > } > override def deserialize(datum: Any): A = { > datum match { > case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) > } > } > } > object AUDT extends AUDT > @SQLUserDefinedType(udt = classOf[BUDT]) > case class B(text:Int) > class BUDT extends UserDefinedType[B] { > override def sqlType: DataType = StructType(Seq(StructField("num", > IntegerType, nullable = false))) > override def userClass: Class[B] = classOf[B] > override def serialize(obj: Any): Any = obj match { > case B(text) => > val row = new GenericMutableRow(1) > row.setInt(0, text) > row > } > override def deserialize(datum: Any): B = { > datum match { case row: InternalRow => new B(row.getInt(0)) } > } > } > object BUDT extends BUDT > object Test { > def main(args:Array[String]) = { > val col = Seq(new A(Seq(new B(1), new B(2))), > new A(Seq(new B(3), new B(4)))) > val sc = new SparkContext(new > SparkConf().setMaster("local[1]").setAppName("TestSpark")) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(1 to 2 zip col).toDF("id","b") > df.select("b").show() > df.collect().foreach(println) > } > } > In the new version (1.6.0) I needed to include the following import: > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow > However, Spark crashes in runtime: > 16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to > org.apache.spark.sql.catalyst.InternalRow > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51) > at > org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) > at > org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org