adding Pregel as an operator in GraphOps and cleaning up documentation of 
GraphOps


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2216319f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2216319f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2216319f

Branch: refs/heads/master
Commit: 2216319f485ca2d00a946c4478dedc8a0e1c6053
Parents: c787ff5
Author: Joseph E. Gonzalez <[email protected]>
Authored: Sun Jan 12 21:26:37 2014 -0800
Committer: Joseph E. Gonzalez <[email protected]>
Committed: Sun Jan 12 21:26:37 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphOps.scala      | 92 +++++++++++++++-----
 .../scala/org/apache/spark/graphx/Pregel.scala  |  4 +-
 2 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2216319f/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 0121cb1..4fdff29 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -6,10 +6,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import org.apache.spark.SparkException
 
-
 /**
  * Contains additional functionality for [[Graph]]. All operations are 
expressed in terms of the
- * efficient GraphX API. This class  is implicitly constructed for each Graph 
object.
+ * efficient GraphX API. This class is implicitly constructed for each Graph 
object.
  *
  * @tparam VD the vertex attribute type
  * @tparam ED the edge attribute type
@@ -19,32 +18,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, 
ED]) {
   /** The number of edges in the graph. */
   lazy val numEdges: Long = graph.edges.count()
 
-
   /** The number of vertices in the graph. */
   lazy val numVertices: Long = graph.vertices.count()
 
-
   /**
    * The in-degree of each vertex in the graph.
    * @note Vertices with no in-edges are not returned in the resulting RDD.
    */
   lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
 
-
   /**
    * The out-degree of each vertex in the graph.
    * @note Vertices with no out-edges are not returned in the resulting RDD.
    */
   lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
 
-
   /**
    * The degree of each vertex in the graph.
    * @note Vertices with no edges are not returned in the resulting RDD.
    */
   lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both)
 
-
   /**
    * Computes the neighboring vertex degrees.
    *
@@ -76,10 +70,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, 
ED]) {
    * age for each user:
    *
    * {{{
-   * val graph: Graph[Int,Int] = loadGraph()
+   * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph")
    * val averageFollowerAge: RDD[(Int, Int)] =
    *   graph.aggregateNeighbors[(Int,Double)](
-   *     (vid, edge) => (edge.otherVertex(vid).data, 1),
+   *     (vid, edge) => Some((edge.otherVertex(vid).data, 1)),
    *     (a, b) => (a._1 + b._1, a._2 + b._2),
    *     -1,
    *     EdgeDirection.In)
@@ -111,11 +105,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) {
         case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, 
dstA))
       }
     }
-
     graph.mapReduceTriplets(mf, reduceFunc)
   } // end of aggregateNeighbors
 
-
   /**
    * Collect the neighbor vertex ids for each vertex.
    *
@@ -147,7 +139,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, 
ED]) {
     }
   } // end of collectNeighborIds
 
-
   /**
    * Collect the neighbor vertex attributes for each vertex.
    *
@@ -173,7 +164,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, 
ED]) {
     }
   } // end of collectNeighbor
 
-
   /**
    * Join the vertices with an RDD and then apply a function from the
    * the vertex and RDD entry to a new vertex value.  The input table
@@ -188,17 +178,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) {
    * corresponding entry in the table otherwise the old vertex value
    * is used.
    *
-   * @note for small tables this function can be much more efficient
-   * than leftJoinVertices
-   *
    * @example This function is used to update the vertices with new
    * values based on external data.  For example we could add the out
    * degree to each vertex record
    *
    * {{{
-   * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
+   * val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph")
    *   .mapVertices(v => 0)
-   * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
+   * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees
    * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
    *   (v, deg) => deg )
    * }}}
@@ -219,8 +206,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) {
    * Filter the graph by computing some values to filter on, and applying the 
predicates.
    *
    * @param preprocess a function to compute new vertex and edge data before 
filtering
-   * @param epred edge pred to filter on after preprocess, see more details 
under Graph#subgraph
-   * @param vpred vertex pred to filter on after prerocess, see more details 
under Graph#subgraph
+   * @param epred edge pred to filter on after preprocess, see more details 
under
+   *  [[org.apache.spark.graphx.Graph#subgraph]]
+   * @param vpred vertex pred to filter on after prerocess, see more details 
under
+   *  [[org.apache.spark.graphx.Graph#subgraph]]
    * @tparam VD2 vertex type the vpred operates on
    * @tparam ED2 edge type the epred operates on
    * @return a subgraph of the orginal graph, with its data unchanged
@@ -246,4 +235,67 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) {
       vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): 
Graph[VD, ED] = {
     graph.mask(preprocess(graph).subgraph(epred, vpred))
   }
+
+  /**
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
+   * user-defined vertex-program `vprog` is executed in parallel on
+   * each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the
+   * destination vertex. The `mergeMsg` function is a commutative
+   * associative function used to combine messages destined to the
+   * same vertex.
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message
+   * then the vertex-program is not invoked.
+   *
+   * This function iterates until there are no remaining messages, or
+   * for `maxIterations` iterations.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param graph the input graph.
+   *
+   * @param initialMsg the message each vertex will receive at the on
+   * the first iteration
+   *
+   * @param maxIterations the maximum number of iterations to run for
+   *
+   * @param activeDirection the direction of edges incident to a vertex that 
received a message in
+   * the previous round on which to run `sendMsg`. For example, if this is 
`EdgeDirection.Out`, only
+   * out-edges of vertices that received a message in the previous round will 
run.
+   *
+   * @param vprog the user-defined vertex program which runs on each
+   * vertex and receives the inbound message and computes a new vertex
+   * value.  On the first iteration the vertex program is invoked on
+   * all vertices and is passed the default message.  On subsequent
+   * iterations the vertex program is only invoked on those vertices
+   * that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to out
+   * edges of vertices that received messages in the current
+   * iteration
+   *
+   * @param mergeMsg a user supplied function that takes two incoming
+   * messages of type A and merges them into a single message of type
+   * A.  ''This function must be commutative and associative and
+   * ideally the size of A should not increase.''
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def pregel[A: ClassTag](
+      initialMsg: A,
+      maxIterations: Int = Int.MaxValue,
+      activeDirection: EdgeDirection = EdgeDirection.Out)(
+      vprog: (VertexID, VD, A) => VD,
+      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
+    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, 
mergeMsg)
+  }
+
 } // end of GraphOps

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2216319f/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 57b0872..83e28d0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -25,8 +25,8 @@ import scala.reflect.ClassTag
  *
  * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
  *   resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): 
Option[Double] =
- *   Some(edge.srcAttr * edge.attr)
+ * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): 
Iterator[(VertexId, Double)] =
+ *   Iterator((edge.dstId, edge.srcAttr * edge.attr))
  * def messageCombiner(a: Double, b: Double): Double = a + b
  * val initialMessage = 0.0
  * // Execute Pregel for a fixed number of iterations.

Reply via email to