[FLINK-3207] [gelly] add pregel methods to gelly-scala

This closes #1575


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/048cda72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/048cda72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/048cda72

Branch: refs/heads/master
Commit: 048cda72b686d200d8e8f455ee3fc6cbeb173ef0
Parents: cc1a797
Author: vasia <[email protected]>
Authored: Tue Feb 2 16:43:04 2016 +0100
Committer: vasia <[email protected]>
Committed: Mon Mar 21 19:10:29 2016 +0100

----------------------------------------------------------------------
 docs/apis/batch/libs/fig/LICENSE.txt            |    2 +-
 .../batch/libs/fig/gelly-gsa-sssp-result.png    |  Bin 14472 -> 0 bytes
 docs/apis/batch/libs/fig/gelly-gsa-sssp2.png    |  Bin 24962 -> 0 bytes
 docs/apis/batch/libs/fig/gelly-vc-sssp2.png     |  Bin 24644 -> 0 bytes
 .../libs/fig/vertex-centric supersteps.png      |  Bin 0 -> 80198 bytes
 docs/apis/batch/libs/gelly.md                   |  322 ++-
 docs/libs/fig/vertex-centric supersteps.png     |  Bin 80198 -> 0 bytes
 docs/libs/gelly_guide.md                        | 1968 +-----------------
 .../apache/flink/graph/examples/PregelSSSP.java |  194 ++
 .../SingleSourceShortestPathsITCase.java        |    2 +-
 .../org/apache/flink/graph/scala/Graph.scala    |   40 +
 .../main/java/org/apache/flink/graph/Graph.java |   10 +-
 .../apache/flink/graph/example/PregelSSSP.java  |  194 --
 .../flink/graph/pregel/ComputeFunction.java     |   37 +-
 .../flink/graph/pregel/MessageCombiner.java     |    4 +-
 .../flink/graph/pregel/MessageIterator.java     |    7 +-
 .../pregel/VertexCentricConfiguration.java      |    4 +-
 .../graph/pregel/VertexCentricIteration.java    |   72 +-
 18 files changed, 608 insertions(+), 2248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/fig/LICENSE.txt 
b/docs/apis/batch/libs/fig/LICENSE.txt
index 35b8673..5d0d22b 100644
--- a/docs/apis/batch/libs/fig/LICENSE.txt
+++ b/docs/apis/batch/libs/fig/LICENSE.txt
@@ -14,4 +14,4 @@ software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
-under the License.
\ No newline at end of file
+under the License.

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gelly-gsa-sssp-result.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/fig/gelly-gsa-sssp-result.png 
b/docs/apis/batch/libs/fig/gelly-gsa-sssp-result.png
deleted file mode 100644
index 6ae74dd..0000000
Binary files a/docs/apis/batch/libs/fig/gelly-gsa-sssp-result.png and /dev/null 
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gelly-gsa-sssp2.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/fig/gelly-gsa-sssp2.png 
b/docs/apis/batch/libs/fig/gelly-gsa-sssp2.png
deleted file mode 100644
index edf19b8..0000000
Binary files a/docs/apis/batch/libs/fig/gelly-gsa-sssp2.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gelly-vc-sssp2.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/fig/gelly-vc-sssp2.png 
b/docs/apis/batch/libs/fig/gelly-vc-sssp2.png
deleted file mode 100644
index 67976b3..0000000
Binary files a/docs/apis/batch/libs/fig/gelly-vc-sssp2.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/vertex-centric
 supersteps.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/fig/vertex-centric supersteps.png 
b/docs/apis/batch/libs/fig/vertex-centric supersteps.png
new file mode 100644
index 0000000..6498a25
Binary files /dev/null and b/docs/apis/batch/libs/fig/vertex-centric 
supersteps.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index b22e362..d803be2 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -790,7 +790,256 @@ When the aggregation computation does not require access 
to the vertex value (fo
 
 Iterative Graph Processing
 -----------
-Gelly exploits Flink's efficient iteration operators to support large-scale 
iterative graph processing. Currently, we provide implementations of the 
popular scatter-gather iterative model and a variation of Gather-Sum-Apply. In 
the following sections, we describe these models and show how you can use them 
in Gelly.
+Gelly exploits Flink's efficient iteration operators to support large-scale 
iterative graph processing. Currently, we provide implementations of the 
vertex-centric, scatter-gather, and gather-sum-apply models. In the following 
sections, we describe these abstractions and show how you can use them in Gelly.
+
+### Vertex-Centric Iterations
+The vertex-centric model, also known as "think like a vertex" or "Pregel", 
expresses computation from the perspective of a vertex in the graph.
+The computation proceeds in synchronized iteration steps, called supersteps. 
In each superstep, each vertex executes one user-defined function.
+Vertices communicate with other vertices through messages. A vertex can send a 
message to any other vertex in the graph, as long as it knows its unique ID.
+
+The computational model is shown in the figure below. The dotted boxes 
correspond to parallelization units.
+In each superstep, all active vertices execute the
+same user-defined computation in parallel. Supersteps are executed 
synchronously, so that messages sent during one superstep are guaranteed to be 
delivered in the beginning of the next superstep.
+
+<p class="text-center">
+    <img alt="Vertex-Centric Computational Model" width="70%" 
src="fig/vertex-centric supersteps.png"/>
+</p>
+
+To use vertex-centric iterations in Gelly, the user only needs to define the 
vertex compute function, `ComputeFunction`.
+This function and the maximum number of iterations to run are given as 
parameters to Gelly's `runVertexCentricIteration`. This method will execute the 
vertex-centric iteration on the input Graph and return a new Graph, with 
updated vertex values. An optional message combiner, `MessageCombiner`, can be 
defined to reduce communication costs.
+
+Let us consider computing Single-Source-Shortest-Paths with vertex-centric 
iterations. Initially, each vertex has a value of infinite distance, except 
from the source vertex, which has a value of zero. During the first superstep, 
the source propagates distances to its neighbors. During the following 
supersteps, each vertex checks its received messages and chooses the minimum 
distance among them. If this distance is smaller than its current value, it 
updates its state and produces messages for its neighbors. If a vertex does not 
change its value during a superstep, then it does not produce any messages for 
its neighbors for the next superstep. The algorithm converges when there are no 
value updates or the maximum number of supersteps has been reached. In this 
algorithm, a message combiner can be used to reduce the number of messages sent 
to a target vertex.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// read the input graph
+Graph<Long, Double, Double> graph = ...
+
+// define the maximum number of iterations
+int maxIterations = 10;
+
+// Execute the vertex-centric iteration
+Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+            new SSSPComputeFunction(), new SSSPCombiner(), maxIterations);
+
+// Extract the vertices as the result
+DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+
+// - - -  UDFs - - - //
+
+public static final class SSSPComputeFunction extends ComputeFunction<Long, 
Double, Double, Double> {
+
+public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> 
messages) {
+
+    double minDistance = (vertex.getId().equals(srcId)) ? 0d : 
Double.POSITIVE_INFINITY;
+
+    for (Double msg : messages) {
+        minDistance = Math.min(minDistance, msg);
+    }
+
+    if (minDistance < vertex.getValue()) {
+        setNewVertexValue(minDistance);
+        for (Edge<Long, Double> e: getEdges()) {
+            sendMessageTo(e.getTarget(), minDistance + e.getValue());
+        }
+    }
+}
+
+// message combiner
+public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
+
+    public void combineMessages(MessageIterator<Double> messages) {
+
+        double minMessage = Double.POSITIVE_INFINITY;
+        for (Double msg: messages) {
+           minMessage = Math.min(minMessage, msg);
+        }
+        sendCombinedMessage(minMessage);
+    }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// read the input graph
+val graph: Graph[Long, Double, Double] = ...
+
+// define the maximum number of iterations
+val maxIterations = 10
+
+// Execute the vertex-centric iteration
+val result = graph.runVertexCentricIteration(new SSSPComputeFunction, new 
SSSPCombiner, maxIterations)
+
+// Extract the vertices as the result
+val singleSourceShortestPaths = result.getVertices
+
+
+// - - -  UDFs - - - //
+
+final class SSSPComputeFunction extends ComputeFunction[Long, Double, Double, 
Double] {
+
+    override def compute(vertex: Vertex[Long, Double], messages: 
MessageIterator[Double]) = {
+
+    var minDistance = if (vertex.getId.equals(srcId)) 0 else Double.MaxValue
+
+    while (messages.hasNext) {
+        val msg = messages.next
+        if (msg < minDistance) {
+            minDistance = msg
+        }
+    }
+
+    if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+        for (edge: Edge[Long, Double] <- getEdges) {
+            sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+        }
+    }
+}
+
+// message combiner
+final class SSSPCombiner extends MessageCombiner[Long, Double] {
+
+    override def combineMessages(messages: MessageIterator[Double]) {
+
+        var minDistance = Double.MaxValue
+
+        while (messages.hasNext) {
+          val msg = inMessages.next
+          if (msg < minDistance) {
+            minDistance = msg
+          }
+        }
+        sendCombinedMessage(minMessage)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a Vertex-Centric Iteration
+A vertex-centric iteration can be configured using a 
`VertexCentricConfiguration` object.
+Currently, the following parameters can be specified:
+
+* <strong>Name</strong>: The name for the vertex-centric iteration. The name 
is displayed in logs and messages
+and can be specified using the `setName()` method.
+
+* <strong>Parallelism</strong>: The parallelism for the iteration. It can be 
set using the `setParallelism()` method.
+
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the 
solution set is kept in managed memory (Flink's internal way of keeping objects 
in serialized form) or as a simple object map. By default, the solution set 
runs in managed memory. This property can be set using the 
`setSolutionSetUnmanagedMemory()` method.
+
+* <strong>Aggregators</strong>: Iteration aggregators can be registered using 
the `registerAggregator()` method. An iteration aggregator combines
+all aggregates globally once per superstep and makes them available in the 
next superstep. Registered aggregators can be accessed inside the user-defined 
`ComputeFunction`.
+
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast 
Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the 
`ComputeFunction`, using the `addBroadcastSet()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+// set the iteration name
+parameters.setName("Gelly Iteration");
+
+// set the parallelism
+parameters.setParallelism(16);
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator());
+
+// run the vertex-centric iteration, also passing the configuration parameters
+Graph<Long, Long, Double> result =
+            graph.runVertexCentricIteration(
+            new Compute(), null, maxIterations, parameters);
+
+// user-defined function
+public static final class Compute extends ComputeFunction {
+
+    LongSumAggregator aggregator = new LongSumAggregator();
+
+    public void preSuperstep() {
+
+        // retrieve the Aggregator
+        aggregator = getIterationAggregator("sumAggregator");
+    }
+
+
+    public void compute(Vertex<Long, Long> vertex, MessageIterator inMessages) 
{
+
+        //do some computation
+        Long partialValue = ...
+
+        // aggregate the partial value
+        aggregator.aggregate(partialValue);
+
+        // update the vertex value
+        setNewVertexValue(...);
+    }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, Long, Double] = ...
+
+val parameters = new VertexCentricConfiguration
+
+// set the iteration name
+parameters.setName("Gelly Iteration")
+
+// set the parallelism
+parameters.setParallelism(16)
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator)
+
+// run the vertex-centric iteration, also passing the configuration parameters
+val result = graph.runVertexCentricIteration(new Compute, new Combiner, 
maxIterations, parameters)
+
+// user-defined function
+final class Compute extends ComputeFunction {
+
+    var aggregator = new LongSumAggregator
+
+    override def preSuperstep {
+
+        // retrieve the Aggregator
+        aggregator = getIterationAggregator("sumAggregator")
+    }
+
+
+    override def compute(vertex: Vertex[Long, Long], inMessages: 
MessageIterator[Long]) {
+
+        //do some computation
+        val partialValue = ...
+
+        // aggregate the partial value
+        aggregator.aggregate(partialValue)
+
+        // update the vertex value
+        setNewVertexValue(...)
+    }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
 
 ### Scatter-Gather Iterations
 The scatter-gather model, also known as "signal/collect" model, expresses 
computation from the perspective of a vertex in the graph. The computation 
proceeds in synchronized iteration steps, called supersteps. In each superstep, 
a vertex produces messages for other vertices and updates its value based on 
the messages it receives. To use scatter-gather iterations in Gelly, the user 
only needs to define how a vertex behaves in each superstep:
@@ -810,10 +1059,6 @@ Let us consider computing Single-Source-Shortest-Paths 
with scatter-gather itera
     <img alt="Scatter-gather SSSP superstep 1" width="70%" 
src="fig/gelly-vc-sssp1.png"/>
 </p>
 
-<p class="text-center">
-    <img alt="Scatter-gather SSSP superstep 2" width="70%" 
src="fig/gelly-vc-sssp2.png"/>
-</p>
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -1184,16 +1429,7 @@ Let us consider computing Single-Source-Shortest-Paths 
with GSA on the following
     <img alt="GSA SSSP superstep 1" width="70%" src="fig/gelly-gsa-sssp1.png"/>
 </p>
 
-<p class="text-center">
-    <img alt="GSA SSSP superstep 2" width="70%" src="fig/gelly-gsa-sssp2.png"/>
-</p>
-
 Notice that, if a vertex does not change its value during a superstep, it will 
not calculate candidate distance during the next superstep. The algorithm 
converges when no vertex changes value.
-The resulting graph after the algorithm converges is shown below.
-
-<p class="text-center">
-    <img alt="GSA SSSP result" width="70%" 
src="fig/gelly-gsa-sssp-result.png"/>
-</p>
 
 To implement this example in Gelly GSA, the user only needs to call the 
`runGatherSumApplyIteration` method on the input graph and provide the 
`GatherFunction`, `SumFunction` and `ApplyFunction` UDFs. Iteration 
synchronization, grouping, value updates and convergence are handled by the 
system:
 
@@ -1442,16 +1678,58 @@ val result = graph.runGatherSumApplyIteration(new 
Gather, new Sum, new Apply, ma
 </div>
 {% top %}
 
-### Scatter-Gather and GSA Comparison
-As seen in the examples above, Gather-Sum-Apply iterations are quite similar 
to scatter-gather iterations. In fact, any algorithm which can be expressed as 
a GSA iteration can also be written in the scatter-gather model.
-The messaging phase of the scatter-gather model is equivalent to the Gather 
and Sum steps of GSA: Gather can be seen as the phase where the messages are 
produced and Sum as the phase where they are routed to the target vertex. 
Similarly, the value update phase corresponds to the Apply step.
+### Iteration Abstractions Comparison
+Although the three iteration abstractions in Gelly seem quite similar, 
understanding their differences can lead to more performant and maintainable 
programs.
+Among the three, the vertex-centric model is the most general model and 
supports arbitrary computation and messaging for each vertex. In the 
scatter-gather model, the logic of producing messages is decoupled from the 
logic of updating vertex values. Thus, programs written using scatter-gather 
are sometimes easier to follow and maintain.
+Separating the messaging phase from the vertex value update logic not only 
makes some programs easier to follow but might also have a positive impact on 
performance. Scatter-gather implementations typically have lower memory 
requirements, because concurrent access to the inbox (messages received) and 
outbox (messages to send) data structures is not required. However, this 
characteristic also limits expressiveness and makes some computation patterns 
non-intuitive. Naturally, if an algorithm requires a vertex to concurrently 
access its inbox and outbox, then the expression of this algorithm in 
scatter-gather might be problematic. Strongly Connected Components and 
Approximate Maximum
+Weight Matching are examples of such graph algorithms. A direct consequence of 
this restriction is that vertices cannot generate messages and update their 
states in the same phase. Thus, deciding whether to propagate a message based 
on its content would require storing it in the vertex value, so that the gather 
phase has access to it, in the following iteration step. Similarly, if the 
vertex update logic includes computation over the values of the neighboring 
edges, these have to be included inside a special message passed from the 
scatter to the gather phase. Such workarounds often lead to higher memory 
requirements and non-elegant, hard to understand algorithm implementations.
+
+Gather-sum-apply iterations are also quite similar to scatter-gather 
iterations. In fact, any algorithm which can be expressed as a GSA iteration 
can also be written in the scatter-gather model. The messaging phase of the 
scatter-gather model is equivalent to the Gather and Sum steps of GSA: Gather 
can be seen as the phase where the messages are produced and Sum as the phase 
where they are routed to the target vertex. Similarly, the value update phase 
corresponds to the Apply step.
 
 The main difference between the two implementations is that the Gather phase 
of GSA parallelizes the computation over the edges, while the messaging phase 
distributes the computation over the vertices. Using the SSSP examples above, 
we see that in the first superstep of the scatter-gather case, vertices 1, 2 
and 3 produce messages in parallel. Vertex 1 produces 3 messages, while 
vertices 2 and 3 produce one message each. In the GSA case on the other hand, 
the computation is parallelized over the edges: the three candidate distance 
values of vertex 1 are produced in parallel. Thus, if the Gather step contains 
"heavy" computation, it might be a better idea to use GSA and spread out the 
computation, instead of burdening a single vertex. Another case when 
parallelizing over the edges might prove to be more efficient is when the input 
graph is skewed (some vertices have a lot more neighbors than others).
 
 Another difference between the two implementations is that the scatter-gather 
implementation uses a `coGroup` operator internally, while GSA uses a `reduce`. 
Therefore, if the function that combines neighbor values (messages) requires 
the whole group of values for the computation, scatter-gather should be used. 
If the update function is associative and commutative, then the GSA's reducer 
is expected to give a more efficient implementation, as it can make use of a 
combiner.
 
-Another thing to note is that GSA works strictly on neighborhoods, while in 
the scatter-gather model, a vertex can send a message to any vertex, given that 
it knows its vertex ID, regardless of whether it is a neighbor.
-Finally, in Gelly's scatter-gather implementation, one can choose the 
messaging direction, i.e. the direction in which updates propagate. GSA does 
not support this yet, so each vertex will be updated based on the values of its 
in-neighbors only.
+Another thing to note is that GSA works strictly on neighborhoods, while in 
the vertex-centric and scatter-gather models, a vertex can send a message to 
any vertex, given that it knows its vertex ID, regardless of whether it is a 
neighbor. Finally, in Gelly's scatter-gather implementation, one can choose the 
messaging direction, i.e. the direction in which updates propagate. GSA does 
not support this yet, so each vertex will be updated based on the values of its 
in-neighbors only.
+
+The main differences among the Gelly iteration models are shown in the table 
below.
+
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Iteration Model</th>
+      <th class="text-center">Update Function</th>
+      <th class="text-center">Update Logic</th>
+      <th class="text-center">Communication Scope</th>
+      <th class="text-center">Communication Logic</th>
+    </tr>
+  </thead>
+  <tbody>
+ <tr>
+  <td>Vertex-Centric</td>
+  <td>arbitrary</td>
+  <td>arbitrary</td>
+  <td>any vertex</td>
+  <td>arbitrary</td>
+</tr>
+<tr>
+  <td>Scatter-Gather</td>
+  <td>arbitrary</td>
+  <td>based on received messages</td>
+  <td>any vertex</td>
+  <td>based on vertex state</td>
+</tr>
+<tr>
+  <td>Gather-Sum-Apply</td>
+  <td>associative and commutative</td>
+  <td>based on neighbors' values</td>
+  <td>neighborhood</td>
+  <td>based on vertex state</td>
+</tr>
+</tbody>
+</table>
+
 
 Graph Validation
 -----------
@@ -1581,12 +1859,12 @@ This is an implementation of the well-known Label 
Propagation algorithm describe
 The algorithm is implemented using [scatter-gather 
iterations](#scatter-gather-iterations).
 Labels are expected to be of type `Comparable` and are initialized using the 
vertex values of the input `Graph`.
 The algorithm iteratively refines discovered communities by propagating 
labels. In each iteration, a vertex adopts
-the label that is most frequent among its neighbors' labels. In case of a tie 
(i.e. two or more labels appear with the 
-same frequency), the algorithm picks the greater label. The algorithm 
converges when no vertex changes its value or 
+the label that is most frequent among its neighbors' labels. In case of a tie 
(i.e. two or more labels appear with the
+same frequency), the algorithm picks the greater label. The algorithm 
converges when no vertex changes its value or
 the maximum number of iterations has been reached. Note that different 
initializations might lead to different results.
 
 #### Usage
-The algorithm takes as input a `Graph` with a `Comparable` vertex type, a 
`Comparable` vertex value type and an arbitrary edge value type. 
+The algorithm takes as input a `Graph` with a `Comparable` vertex type, a 
`Comparable` vertex value type and an arbitrary edge value type.
 It returns a `DataSet` of vertices, where the vertex value corresponds to the 
community in which this vertex belongs after convergence.
 The constructor takes one parameter:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/docs/libs/fig/vertex-centric
 supersteps.png
----------------------------------------------------------------------
diff --git a/docs/libs/fig/vertex-centric supersteps.png 
b/docs/libs/fig/vertex-centric supersteps.png
deleted file mode 100644
index 6498a25..0000000
Binary files a/docs/libs/fig/vertex-centric supersteps.png and /dev/null differ

Reply via email to