Wait a minute... doesn't a reduce function return 1 element PER key pair? 
For example, word-count mapreduce functions return a {word, count} element
for every unique word.  Is this supposed to be a 1-element RDD object?

The .reduce function for a MappedRDD or FlatMappedRDD both are of the form

    def reduce(f: (T, T) => T): T

So presumably if I pass the reduce function a list of values {(X,1), (X,1),
(X,1), (Y,1), (Y,1)} and the function is ( (A,B) => (A._1, A._2+B._2 ) ),
then I should get a final vector of {(X,3), (Y,2)}, correct?


I have the following object:

    scala> temp3
    res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int],
Int)] = MappedRDD[107] at map at <console>:27

and it contains the following:

    scala> temp3.collect
    . . .
    res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] =
Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E...

but when I run the following, I only get one element in the final vector:

    scala> temp3.reduce( (A,B) => (A._1, A._2+B._2 ) )
    . . .
    res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256)

I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2),
(Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1),
(Edge(4,7,1),1), (Edge(5,7,1),2) }



Am I not mapping something correctly before running reduce?  I've tried both
.map and .flatMap, and put in _.copy() everywhere, e.g.

temp3.flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 ) )
temp3.map(_.copy()).flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 )
)
etc.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to