[ 
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341895#comment-14341895
 ] 

Marko Bonaci edited comment on SPARK-2620 at 3/1/15 1:21 PM:
-------------------------------------------------------------

*Spark 1.2 shell local:*

{code:java}
scala> case class P(name:String)
defined class P

scala> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
ps: Array[P] = Array(P(alice), P(bob), P(charly), P(bob))

scala> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
res8: Array[(P, Int)] = Array((P(alice),1), (P(charly),1), (P(bob),2))
{code}

{code:java}
scala> case class Item(i: Int)
defined class Item

scala> val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4).map(i => 
(Item(i),i)), 2)
dups: org.apache.spark.rdd.RDD[(Item, Int)] = ParallelCollectionRDD[19] at 
makeRDD at <console>:14

scala> val correct = Array((Item(1), Seq(1, 1)),
     |                     (Item(2), Seq(2, 2)),
     |                     (Item(3), Seq(3, 3)),
     |                     (Item(4), Seq(4, 4)))
correct: Array[(Item, Seq[Int])] = Array((Item(1),List(1, 1)), (Item(2),List(2, 
2)), (Item(3),List(3, 3)), (Item(4),List(4, 4)))

scala> val actual = dups.groupByKey()
actual: org.apache.spark.rdd.RDD[(Item, Iterable[Int])] = ShuffledRDD[20] at 
groupByKey at <console>:16

scala> actual.count
res9: Long = 4

scala> actual.collect().sortBy(k => k._1.i)
res10: Array[(Item, Iterable[Int])] = Array((Item(1),CompactBuffer(1, 1)), 
(Item(2),CompactBuffer(2, 2)), (Item(3),CompactBuffer(3, 3)), 
(Item(4),CompactBuffer(4, 4)))
{code}


was (Author: mbonaci):
*Spark 1.2 shell local:*

{code:java}
scala> case class P(name:String)
defined class P

scala> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
ps: Array[P] = Array(P(alice), P(bob), P(charly), P(bob))

scala> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
res8: Array[(P, Int)] = Array((P(alice),1), (P(charly),1), (P(bob),2))
{code}

> case class cannot be used as key for reduce
> -------------------------------------------
>
>                 Key: SPARK-2620
>                 URL: https://issues.apache.org/jira/browse/SPARK-2620
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell
>    Affects Versions: 1.0.0, 1.1.0
>         Environment: reproduced on spark-shell local[4]
>            Reporter: Gerard Maas
>            Assignee: Tobias Schlatter
>            Priority: Critical
>              Labels: case-class, core
>
> Using a case class as a key doesn't seem to work properly on Spark 1.0.0
> A minimal example:
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), 
> (P(bob),1), (P(abe),1), (P(charly),1))
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
> groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to