Wait a minute... doesn't a reduce function return 1 element PER key pair?
For example, word-count mapreduce functions return a {word, count} element
for every unique word. Is this supposed to be a 1-element RDD object?
The .reduce function for a MappedRDD or FlatMappedRDD both are of the form
def reduce(f: (T, T) => T): T
So presumably if I pass the reduce function a list of values {(X,1), (X,1),
(X,1), (Y,1), (Y,1)} and the function is ( (A,B) => (A._1, A._2+B._2 ) ),
then I should get a final vector of {(X,3), (Y,2)}, correct?
I have the following object:
scala> temp3
res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int],
Int)] = MappedRDD[107] at map at <console>:27
and it contains the following:
scala> temp3.collect
. . .
res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] =
Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E...
but when I run the following, I only get one element in the final vector:
scala> temp3.reduce( (A,B) => (A._1, A._2+B._2 ) )
. . .
res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256)
I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2),
(Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1),
(Edge(4,7,1),1), (Edge(5,7,1),2) }
Am I not mapping something correctly before running reduce? I've tried both
.map and .flatMap, and put in _.copy() everywhere, e.g.
temp3.flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 ) )
temp3.map(_.copy()).flatMap(A => Seq(A)).reduce( (A,B) => (A._1, A._2+B._2 )
)
etc.
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.