You are probably looking for reduceByKey in that case. "reduce" just reduces everything in the collection into a single element.
On Tue, May 20, 2014 at 12:16 PM, GlennStrycker <glenn.stryc...@gmail.com>wrote: > Wait a minute... doesn't a reduce function return 1 element PER key pair? > For example, word-count mapreduce functions return a {word, count} element > for every unique word. Is this supposed to be a 1-element RDD object? > > The .reduce function for a MappedRDD or FlatMappedRDD both are of the form > > def reduce(f: (T, T) => T): T > > So presumably if I pass the reduce function a list of values {(X,1), (X,1), > (X,1), (Y,1), (Y,1)} and the function is ( (A,B) => (A._1, A._2+B._2 ) ), > then I should get a final vector of {(X,3), (Y,2)}, correct? > > > I have the following object: > > scala> temp3 > res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int], > Int)] = MappedRDD[107] at map at <console>:27 > > and it contains the following: > > scala> temp3.collect > . . . > res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] = > Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), > (Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E... > > but when I run the following, I only get one element in the final vector: > > scala> temp3.reduce( (A,B) => (A._1, A._2+B._2 ) ) > . . . > res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256) > > I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2), > (Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1), > (Edge(4,7,1),1), (Edge(5,7,1),2) } > > > > Am I not mapping something correctly before running reduce? I've tried > both > .map and .flatMap, and put in _.copy() everywhere, e.g. > > temp3.flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 ) ) > temp3.map(_.copy()).flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 > ) > ) > etc. > > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. >