You are probably looking for reduceByKey in that case.

"reduce" just reduces everything in the collection into a single element.


On Tue, May 20, 2014 at 12:16 PM, GlennStrycker <glenn.stryc...@gmail.com>wrote:

> Wait a minute... doesn't a reduce function return 1 element PER key pair?
> For example, word-count mapreduce functions return a {word, count} element
> for every unique word.  Is this supposed to be a 1-element RDD object?
>
> The .reduce function for a MappedRDD or FlatMappedRDD both are of the form
>
>     def reduce(f: (T, T) => T): T
>
> So presumably if I pass the reduce function a list of values {(X,1), (X,1),
> (X,1), (Y,1), (Y,1)} and the function is ( (A,B) => (A._1, A._2+B._2 ) ),
> then I should get a final vector of {(X,3), (Y,2)}, correct?
>
>
> I have the following object:
>
>     scala> temp3
>     res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int],
> Int)] = MappedRDD[107] at map at <console>:27
>
> and it contains the following:
>
>     scala> temp3.collect
>     . . .
>     res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] =
> Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
> (Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E...
>
> but when I run the following, I only get one element in the final vector:
>
>     scala> temp3.reduce( (A,B) => (A._1, A._2+B._2 ) )
>     . . .
>     res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256)
>
> I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2),
> (Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1),
> (Edge(4,7,1),1), (Edge(5,7,1),2) }
>
>
>
> Am I not mapping something correctly before running reduce?  I've tried
> both
> .map and .flatMap, and put in _.copy() everywhere, e.g.
>
> temp3.flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 ) )
> temp3.map(_.copy()).flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2
> )
> )
> etc.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>

Reply via email to