Thanks Michael.

I could not use case class here since I need to later modify the output of
getX() so that the output is dynamically generated.

the bigger context is this:
I want to implement topN(), using a BoundedPriorityQueue. basically I
include a queue in reduce(), or aggregateByKey(), but the only available
serializer is kyro, and it's extremely slow in this case because
BoundedPriorityQueue probably has a lot of internal fields.

so I want to wrap the queue in a wrapper class, and only export the queue
content through getContent() and setContent(), and the content is a list of
tuples. This way when I encode the wrapper, the bean encoder simply encodes
the getContent() output, I think. encoding a list of tuples is very fast.

Yang

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang <teddyyyy...@gmail.com> wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at <con sole>:31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>

Reply via email to