Repository: spark Updated Branches: refs/heads/branch-1.2 e8669729a -> 9f3b159a5
Removing confusing TripletFields After additional discussion with rxin, I think having all the possible `TripletField` options is confusing. This pull request reduces the triplet fields to: ```java /** * None of the triplet fields are exposed. */ public static final TripletFields None = new TripletFields(false, false, false); /** * Expose only the edge field and not the source or destination field. */ public static final TripletFields EdgeOnly = new TripletFields(false, false, true); /** * Expose the source and edge fields but not the destination field. (Same as Src) */ public static final TripletFields Src = new TripletFields(true, false, true); /** * Expose the destination and edge fields but not the source field. (Same as Dst) */ public static final TripletFields Dst = new TripletFields(false, true, true); /** * Expose all the fields (source, edge, and destination). */ public static final TripletFields All = new TripletFields(true, true, true); ``` Author: Joseph E. Gonzalez <[email protected]> Closes #3472 from jegonzal/SimplifyTripletFields and squashes the following commits: 91796b5 [Joseph E. Gonzalez] removing confusing triplet fields (cherry picked from commit 288ce583b05004a8c71dcd836fab23caff5d4ba7) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f3b159a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f3b159a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f3b159a Branch: refs/heads/branch-1.2 Commit: 9f3b159a5b71bc3aba54a14f5e3af46c87396e79 Parents: e866972 Author: Joseph E. Gonzalez <[email protected]> Authored: Wed Nov 26 00:55:28 2014 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Nov 26 00:55:41 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/graphx/GraphOps.scala | 6 ++-- .../org/apache/spark/graphx/TripletFields.java | 29 ++------------------ .../org/apache/spark/graphx/lib/PageRank.scala | 4 +-- .../org/apache/spark/graphx/GraphSuite.scala | 2 +- 4 files changed, 8 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9f3b159a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d515038..116d1ea 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -129,15 +129,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))) ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))) }, - (a, b) => a ++ b, TripletFields.SrcDstOnly) + (a, b) => a ++ b, TripletFields.All) case EdgeDirection.In => graph.aggregateMessages[Array[(VertexId,VD)]]( ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))), - (a, b) => a ++ b, TripletFields.SrcOnly) + (a, b) => a ++ b, TripletFields.Src) case EdgeDirection.Out => graph.aggregateMessages[Array[(VertexId,VD)]]( ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))), - (a, b) => a ++ b, TripletFields.DstOnly) + (a, b) => a ++ b, TripletFields.Dst) case EdgeDirection.Both => throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" + "EdgeDirection.Either instead.") http://git-wip-us.apache.org/repos/asf/spark/blob/9f3b159a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java index 8dfccfe..7eb4ae0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java +++ b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java @@ -56,39 +56,14 @@ public class TripletFields implements Serializable { public static final TripletFields EdgeOnly = new TripletFields(false, false, true); /** - * Expose only the source field and not the edge or destination field. - */ - public static final TripletFields SrcOnly = new TripletFields(true, false, false); - - /** - * Expose only the destination field and not the edge or source field. - */ - public static final TripletFields DstOnly = new TripletFields(false, true, false); - - /** - * Expose the source and destination fields but not the edge field. - */ - public static final TripletFields SrcDstOnly = new TripletFields(true, true, false); - - /** * Expose the source and edge fields but not the destination field. (Same as Src) */ - public static final TripletFields SrcAndEdge = new TripletFields(true, false, true); - - /** - * Expose the source and edge fields but not the destination field. (Same as SrcAndEdge) - */ - public static final TripletFields Src = SrcAndEdge; + public static final TripletFields Src = new TripletFields(true, false, true); /** * Expose the destination and edge fields but not the source field. (Same as Dst) */ - public static final TripletFields DstAndEdge = new TripletFields(false, true, true); - - /** - * Expose the destination and edge fields but not the source field. (Same as DstAndEdge) - */ - public static final TripletFields Dst = DstAndEdge; + public static final TripletFields Dst = new TripletFields(false, true, true); /** * Expose all the fields (source, edge, and destination). http://git-wip-us.apache.org/repos/asf/spark/blob/9f3b159a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index e40ae0d..e139959 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -85,7 +85,7 @@ object PageRank extends Logging { // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.SrcOnly ) + .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) // Set the vertex attributes to the initial pagerank values .mapVertices( (id, attr) => resetProb ) @@ -97,7 +97,7 @@ object PageRank extends Logging { // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. val rankUpdates = rankGraph.aggregateMessages[Double]( - ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.SrcAndEdge) + ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the http://git-wip-us.apache.org/repos/asf/spark/blob/9f3b159a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index df773db..a05d1dd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -328,7 +328,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { "expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr) } ctx.sendToDst(ctx.srcAttr) - }, _ + _, TripletFields.SrcOnly) + }, _ + _, TripletFields.Src) assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
