[ 
https://issues.apache.org/jira/browse/SPARK-18055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965822#comment-15965822
 ] 

Paul Zaczkieiwcz commented on SPARK-18055:
------------------------------------------

Actually it seems it was this issue.  I forgot to mention that the Aggregator 
returns an Array[case class] which I would then like to flatMap.  I get the 
stack trace on the {{stitcher.toColumn}} line rather than the {{flatMap}} line. 
 I've since patched Spark with this PR to work around this issue.
{code:java}
case class StitchedVisitor(cookie_id:java.math.BigDecimal, visit_num:Int, ...)
case class CookieId(cookie_id:java.math.BigDecimal)
val 
aggregator[Aggregator[StitchedVisitor,Array[Option[StitchedVisitor]],Array[StitchedVisitor]]]
 = ...

df.groupByKey(s => CookieId(s.cookie_id)
).agg(stitcher.toColumn
).flatMap(agg => agg._2)
{code}

> Dataset.flatMap can't work with types from customized jar
> ---------------------------------------------------------
>
>                 Key: SPARK-18055
>                 URL: https://issues.apache.org/jira/browse/SPARK-18055
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1
>            Reporter: Davies Liu
>            Assignee: Michael Armbrust
>             Fix For: 2.0.3, 2.1.1, 2.2.0
>
>         Attachments: test-jar_2.11-1.0.jar
>
>
> Try to apply flatMap() on Dataset column which of of type
> com.A.B
> Here's a schema of a dataset:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- outputs: array (nullable = true)
>  |    |-- element: string
> {code}
> flatMap works on RDD
> {code}
>  ds.rdd.flatMap(_.outputs)
> {code}
> flatMap doesnt work on dataset and gives the following error
> {code}
> ds.flatMap(_.outputs)
> {code}
> The exception:
> {code}
> scala.ScalaReflectionException: class com.A.B in JavaMirror … not found
>     at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
>     at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
>     at 
> line189424fbb8cd47b3b62dc41e417841c159.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator3$1.apply(<console>:51)
>     at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>     at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>     at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
>     at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>     at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>     at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
>     at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
> {code}
> Spoke to Michael Armbrust and he confirmed it as a Dataset bug.
> There is a workaround using explode()
> {code}
> ds.select(explode(col("outputs")))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to