[
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341895#comment-14341895
]
Marko Bonaci edited comment on SPARK-2620 at 3/1/15 1:21 PM:
-------------------------------------------------------------
*Spark 1.2 shell local:*
{code:java}
scala> case class P(name:String)
defined class P
scala> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
ps: Array[P] = Array(P(alice), P(bob), P(charly), P(bob))
scala> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
res8: Array[(P, Int)] = Array((P(alice),1), (P(charly),1), (P(bob),2))
{code}
{code:java}
scala> case class Item(i: Int)
defined class Item
scala> val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4).map(i =>
(Item(i),i)), 2)
dups: org.apache.spark.rdd.RDD[(Item, Int)] = ParallelCollectionRDD[19] at
makeRDD at <console>:14
scala> val correct = Array((Item(1), Seq(1, 1)),
| (Item(2), Seq(2, 2)),
| (Item(3), Seq(3, 3)),
| (Item(4), Seq(4, 4)))
correct: Array[(Item, Seq[Int])] = Array((Item(1),List(1, 1)), (Item(2),List(2,
2)), (Item(3),List(3, 3)), (Item(4),List(4, 4)))
scala> val actual = dups.groupByKey()
actual: org.apache.spark.rdd.RDD[(Item, Iterable[Int])] = ShuffledRDD[20] at
groupByKey at <console>:16
scala> actual.count
res9: Long = 4
scala> actual.collect().sortBy(k => k._1.i)
res10: Array[(Item, Iterable[Int])] = Array((Item(1),CompactBuffer(1, 1)),
(Item(2),CompactBuffer(2, 2)), (Item(3),CompactBuffer(3, 3)),
(Item(4),CompactBuffer(4, 4)))
{code}
was (Author: mbonaci):
*Spark 1.2 shell local:*
{code:java}
scala> case class P(name:String)
defined class P
scala> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
ps: Array[P] = Array(P(alice), P(bob), P(charly), P(bob))
scala> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
res8: Array[(P, Int)] = Array((P(alice),1), (P(charly),1), (P(bob),2))
{code}
> case class cannot be used as key for reduce
> -------------------------------------------
>
> Key: SPARK-2620
> URL: https://issues.apache.org/jira/browse/SPARK-2620
> Project: Spark
> Issue Type: Bug
> Components: Spark Shell
> Affects Versions: 1.0.0, 1.1.0
> Environment: reproduced on spark-shell local[4]
> Reporter: Gerard Maas
> Assignee: Tobias Schlatter
> Priority: Critical
> Labels: case-class, core
>
> Using a case class as a key doesn't seem to work properly on Spark 1.0.0
> A minimal example:
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
> (P(bob),1), (P(abe),1), (P(charly),1))
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
> groupByKey and distinct also present the same behavior.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]