Repository: flink
Updated Branches:
  refs/heads/master 35892ed14 -> 048cda72b


http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java
new file mode 100644
index 0000000..e647653
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.graph.pregel.ComputeFunction;
+import org.apache.flink.graph.pregel.MessageCombiner;
+import org.apache.flink.graph.pregel.MessageIterator;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's Vertex-Centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a scatter-gather implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}
+ * and for a gather-sum-apply implementation see {@link 
GSASingleSourceShortestPaths}.  
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.examples.data.SingleSourceShortestPathsData}
+ */
+public class PregelSSSP implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
+
+               // Execute the vertex-centric iteration
+               Graph<Long, Double, Double> result = 
graph.runVertexCentricIteration(
+                               new SSSPComputeFunction(srcVertexId), new 
SSSPCombiner(), 
+                               maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
+                       env.execute("Pregel Single Source Shortest Paths 
Example");
+               } else {
+                       singleSourceShortestPaths.print();
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Double> {
+
+               public Double map(Long id) { return Double.POSITIVE_INFINITY; }
+       }
+
+       /**
+        * The compute function for SSSP
+        */
+       @SuppressWarnings("serial")
+       public static final class SSSPComputeFunction extends 
ComputeFunction<Long, Double, Double, Double> {
+
+               private final long srcId;
+
+               public SSSPComputeFunction(long src) {
+                       this.srcId = src;
+               }
+
+               public void compute(Vertex<Long, Double> vertex, 
MessageIterator<Double> messages) {
+
+                       double minDistance = (vertex.getId().equals(srcId)) ? 
0d : Double.POSITIVE_INFINITY;
+
+                       for (Double msg : messages) {
+                               minDistance = Math.min(minDistance, msg);
+                       }
+
+                       if (minDistance < vertex.getValue()) {
+                               setNewVertexValue(minDistance);
+                               for (Edge<Long, Double> e: getEdges()) {
+                                       sendMessageTo(e.getTarget(), 
minDistance + e.getValue());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * The messages combiner.
+        * Out of all messages destined to a target vertex, only the minimum 
distance is propagated.
+        */
+       @SuppressWarnings("serial")
+       public static final class SSSPCombiner extends MessageCombiner<Long, 
Double> {
+
+               public void combineMessages(MessageIterator<Double> messages) {
+
+                       double minMessage = Double.POSITIVE_INFINITY;
+                       for (Double msg: messages) {
+                               minMessage = Math.min(minMessage, msg);
+                       }
+                       sendCombinedMessage(minMessage);
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static Long srcVertexId = 1l;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: PregelSSSP <source 
vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       srcVertexId = Long.parseLong(args[0]);
+                       edgesInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                               System.out.println("Executing Pregel Single 
Source Shortest Paths example "
+                                               + "with default parameters and 
built-in default data.");
+                               System.out.println("  Provide parameters to 
read input data from files.");
+                               System.out.println("  See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: PregelSSSP <source 
vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .lineDelimiter("\n")
+                                       .fieldDelimiter("\t")
+                                       .ignoreComments("%")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Vertex-centric Single Source Shortest Paths";
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
index 258ed16..9019d0b 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
@@ -24,7 +24,7 @@ import com.google.common.io.Files;
 import org.apache.flink.graph.examples.GSASingleSourceShortestPaths;
 import org.apache.flink.graph.examples.SingleSourceShortestPaths;
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
-import org.apache.flink.graph.example.PregelSSSP;
+import org.apache.flink.graph.examples.PregelSSSP;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 19d6dd3..a5f9f8d 100644
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -31,6 +31,9 @@ import org.apache.flink.{graph => jg}
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.reflect.ClassTag
 import org.apache.flink.types.NullValue
+import org.apache.flink.graph.pregel.ComputeFunction
+import org.apache.flink.graph.pregel.MessageCombiner
+import org.apache.flink.graph.pregel.VertexCentricConfiguration
 
 object Graph {
 
@@ -1105,6 +1108,43 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) 
{
       maxIterations, parameters))
   }
 
+   /**
+   * Runs a vertex-centric iteration on the graph.
+   * No configuration options are provided.
+   *
+   * @param computeFunction the compute function
+   * @param combineFunction the optional message combiner function
+   * @param maxIterations maximum number of iterations to perform
+   *
+   * @return the updated Graph after the vertex-centric iteration has 
converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, 
M],
+                                   combineFunction: MessageCombiner[K, M],
+                                   maxIterations: Int): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(computeFunction, 
combineFunction,
+      maxIterations))
+  }
+
+  /**
+   * Runs a vertex-centric iteration on the graph with configuration options.
+   *
+   * @param computeFunction the compute function
+   * @param combineFunction the optional message combiner function
+   * @param maxIterations maximum number of iterations to perform
+   * @param parameters the iteration configuration parameters
+   *
+   * @return the updated Graph after the vertex-centric iteration has 
converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, 
M],
+                                   combineFunction: MessageCombiner[K, M],
+                                   maxIterations: Int, parameters: 
VertexCentricConfiguration):
+  Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(computeFunction, 
combineFunction,
+      maxIterations, parameters))
+  }
+
   def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
     jgraph.validate(validator)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f2b5b22..d46056a 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1684,10 +1684,10 @@ public class Graph<K, VV, EV> {
        }
 
        /**
-        * Runs a VertexCentric iteration on the graph.
+        * Runs a {@link VertexCentricIteration} on the graph.
         * No configuration options are provided.
         *
-        * @param computeFunction the vertex update function
+        * @param computeFunction the vertex compute function
         * @param combiner an optional message combiner
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
         * 
@@ -1702,12 +1702,12 @@ public class Graph<K, VV, EV> {
        }
 
        /**
-        * Runs a VetexCentric iteration on the graph with configuration 
options.
+        * Runs a {@link VertexCentricIteration} on the graph with 
configuration options.
         * 
-        * @param computeFunction the vertex update function
+        * @param computeFunction the vertex compute function
         * @param combiner an optional message combiner
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
-        * @param parameters the iteration configuration parameters
+        * @param parameters the {@link VertexCentricConfiguration} parameters
         * 
         * @return the updated Graph after the vertex-centric iteration has 
converged or
         * after maximumNumberOfIterations.

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
deleted file mode 100644
index 93cc360..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.pregel.ComputeFunction;
-import org.apache.flink.graph.pregel.MessageCombiner;
-import org.apache.flink.graph.pregel.MessageIterator;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use Gelly's Vertex-Centric iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a scatter-gather implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}
- * and for a gather-sum-apply implementation see {@link 
GSASingleSourceShortestPaths}.  
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
- */
-public class PregelSSSP implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
-
-               // Execute the vertex-centric iteration
-               Graph<Long, Double, Double> result = 
graph.runVertexCentricIteration(
-                               new SSSPComputeFunction(srcVertexId), new 
SSSPCombiner(), 
-                               maxIterations);
-
-               // Extract the vertices as the result
-               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
-
-               // emit result
-               if (fileOutput) {
-                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
-                       env.execute("Pregel Single Source Shortest Paths 
Example");
-               } else {
-                       singleSourceShortestPaths.print();
-               }
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Single Source Shortest Path UDFs
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class InitVertices implements MapFunction<Long, 
Double> {
-
-               public Double map(Long id) { return Double.POSITIVE_INFINITY; }
-       }
-
-       /**
-        * The compute function for SSSP
-        */
-       @SuppressWarnings("serial")
-       public static final class SSSPComputeFunction extends 
ComputeFunction<Long, Double, Double, Double> {
-
-               private final long srcId;
-
-               public SSSPComputeFunction(long src) {
-                       this.srcId = src;
-               }
-
-               public void compute(Vertex<Long, Double> vertex, 
MessageIterator<Double> messages) {
-
-                       double minDistance = (vertex.getId().equals(srcId)) ? 
0d : Double.POSITIVE_INFINITY;
-
-                       for (Double msg : messages) {
-                               minDistance = Math.min(minDistance, msg);
-                       }
-
-                       if (minDistance < vertex.getValue()) {
-                               setNewVertexValue(minDistance);
-                               for (Edge<Long, Double> e: getEdges()) {
-                                       sendMessageTo(e.getTarget(), 
minDistance + e.getValue());
-                               }
-                       }
-               }
-       }
-
-       /**
-        * The messages combiner.
-        * Out of all messages destined to a target vertex, only the minimum 
distance is propagated.
-        */
-       @SuppressWarnings("serial")
-       public static final class SSSPCombiner extends MessageCombiner<Long, 
Double> {
-
-               public void combineMessages(MessageIterator<Double> messages) {
-
-                       double minMessage = Double.POSITIVE_INFINITY;
-                       for (Double msg: messages) {
-                               minMessage = Math.min(minMessage, msg);
-                       }
-                       sendCombinedMessage(minMessage);
-               }
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static Long srcVertexId = 1l;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static int maxIterations = 5;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length != 4) {
-                               System.err.println("Usage: PregelSSSP <source 
vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       srcVertexId = Long.parseLong(args[0]);
-                       edgesInputPath = args[1];
-                       outputPath = args[2];
-                       maxIterations = Integer.parseInt(args[3]);
-               } else {
-                               System.out.println("Executing Pregel Single 
Source Shortest Paths example "
-                                               + "with default parameters and 
built-in default data.");
-                               System.out.println("  Provide parameters to 
read input data from files.");
-                               System.out.println("  See the documentation for 
the correct format of input files.");
-                               System.out.println("Usage: PregelSSSP <source 
vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .lineDelimiter("\n")
-                                       .fieldDelimiter("\t")
-                                       .ignoreComments("%")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
-               } else {
-                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Vertex-centric Single Source Shortest Paths";
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index 09a000f..db64f63 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.Either;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
@@ -84,30 +85,20 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         * @return An iterator with all edges.
         */
        public final Iterable<Edge<K, EV>> getEdges() {
-               if (edgesUsed) {
-                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
-               }
-               edgesUsed = true;
+               verifyEdgeUsage();
                this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
                return this.edgeIterator;
        }
 
        /**
-        * Sends the given message to all vertices that are targets of an edge 
of the changed vertex.
+        * Sends the given message to all vertices that adjacent to the changed 
vertex.
         * This method is mutually exclusive to the method {@link #getEdges()} 
and may be called only once.
         * 
         * @param m The message to send.
         */
        public final void sendMessageToAllNeighbors(Message m) {
-               if (edgesUsed) {
-                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()'"
-                                       + "exactly once.");
-               }
-               
-               edgesUsed = true;
-
+               verifyEdgeUsage();
                outMsg.setField(m, 1);
-               
                while (edges.hasNext()) {
                        Tuple next = (Tuple) edges.next();
                        outMsg.setField(next.getField(1), 0);
@@ -163,7 +154,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         * all aggregates globally once per superstep and makes them available 
in the next superstep.
         * 
         * @param name The name of the aggregator.
-        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
+        * @return The aggregator registered under this name, or {@code null}, 
if no aggregator was registered.
         */
        public final <T extends Aggregator<?>> T getIterationAggregator(String 
name) {
                return this.runtimeContext.<T>getIterationAggregator(name);
@@ -182,7 +173,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
        /**
         * Gets the broadcast data set registered under the given name. 
Broadcast data sets
         * are available on all parallel instances of a function. They can be 
registered via
-        * {@link 
org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSet(String,
 org.apache.flink.api.java.DataSet)}.
+        * {@link 
org.apache.flink.graph.pregel.VertexCentricConfiguration#addBroadcastSet(String,
 DataSet)}.
         * 
         * @param name The name under which the broadcast set is registered.
         * @return The broadcast data set.
@@ -228,14 +219,22 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
                this.edgesUsed = false;
                setNewVertexValueCalled = false;
        }
-       
+
+       private void verifyEdgeUsage() throws IllegalStateException {
+               if (edgesUsed) {
+                       throw new IllegalStateException(
+                                       "Can use either 'getEdges()' or 
'sendMessageToAllNeighbors()' exactly once.");
+               }
+               edgesUsed = true;
+       }
+
        private static final class EdgesIterator<K, EV> 
                implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
        {
                private Iterator<Edge<K, EV>> input;
                
-               private Edge<K, EV> edge = new Edge<K, EV>();
-               
+               private Edge<K, EV> edge = new Edge<>();
+
                void set(Iterator<Edge<K, EV>> input) {
                        this.input = input;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 9458323..70c8262 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.pregel;
 import java.io.Serializable;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -49,7 +49,7 @@ public abstract class MessageCombiner<K, Message> implements 
Serializable {
        }
 
        /**
-        * Combines messages from sent different vertices to a target vertex.
+        * Combines messages sent from different vertices to a target vertex.
         * Implementing this method might reduce communication costs during a 
vertex-centric
         * iteration.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
index 620cb93..5a17b90 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.pregel;
 import java.util.Iterator;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -47,11 +47,8 @@ public final class MessageIterator<Message> implements 
Iterator<Message>, Iterab
                if (first != null) {
                        return true;
                }
-               else if (this.source != null) {
-                       return this.source.hasNext();   
-               }
                else {
-                       return false;
+                       return ((this.source != null) && 
(this.source.hasNext()));
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 287f20d..026e869 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -44,7 +44,7 @@ public class VertexCentricConfiguration extends 
IterationConfiguration {
        /**
         * Adds a data set as a broadcast set to the compute function.
         *
-        * @param name The name under which the broadcast data is available in 
the compute function.
+        * @param name The name under which the broadcast data set is available 
in the compute function.
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSet(String name, DataSet<?> data) {
@@ -55,7 +55,7 @@ public class VertexCentricConfiguration extends 
IterationConfiguration {
         * Get the broadcast variables of the compute function.
         *
         * @return a List of Tuple2, where the first field is the broadcast 
variable name
-        * and the second field is the broadcast DataSet.
+        * and the second field is the broadcast data set.
         */
        public List<Tuple2<String, DataSet<?>>> getBcastVars() {
                return this.bcVars;

http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index c79ace7..9434221 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -22,22 +22,21 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.Either;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -46,6 +45,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -70,8 +70,8 @@ import com.google.common.base.Preconditions;
  * </ul>
  * <p>
  *
- * Vertex-centric graph iterations are are run by calling
- * {@link Graph#runVertexCentricIteration(ComputeFunction, int)}.
+ * Vertex-centric graph iterations are run by calling
+ * {@link Graph#runVertexCentricIteration(ComputeFunction, MessageCombiner, 
int)}.
  *
  * @param <K> The type of the vertex key (the vertex identifier).
  * @param <VV> The type of the vertex value (the state of the vertex).
@@ -137,6 +137,16 @@ public class VertexCentricIteration<K, VV, EV, Message>
        
        /**
         * Creates the operator that represents this vertex-centric graph 
computation.
+        * <p>
+        *  The Pregel iteration is mapped to delta iteration as follows.
+        *  The solution set consists of the set of active vertices and the 
workset contains the set of messages
+        *  send to vertices during the previous superstep. Initially, the 
workset contains a null message for each vertex.
+        *  In the beginning of a superstep, the solution set is joined with 
the workset to produce
+        *  a dataset containing tuples of vertex state and messages (vertex 
inbox).
+        *  The superstep compute UDF is realized with a coGroup between the 
vertices with inbox and the graph edges.
+        *  The output of the compute UDF contains both the new vertex values 
and the new messages produced.
+        *  These are directed to the solution set delta and new workset, 
respectively, with subsequent flatMaps.
+        * <p/>
         * 
         * @return The operator that represents this vertex-centric graph 
computation.
         */
@@ -163,7 +173,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
                final DeltaIteration<Vertex<K, VV>,     Tuple2<K, 
Either<NullValue, Message>>> iteration =
                                initialVertices.iterateDelta(initialWorkSet, 
this.maximumNumberOfIterations, 0);
-                               setUpIteration(iteration);
+               setUpIteration(iteration);
 
                // join with the current state to get vertex values
                DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> 
verticesWithMsgs =
@@ -219,15 +229,14 @@ public class VertexCentricIteration<K, VV, EV, Message>
         * Creates a new vertex-centric iteration operator.
         * 
         * @param edgesWithValue The data set containing edges.
-        * @param uf The function that updates the state of the vertices from 
the incoming messages.
-        * @param mf The function that turns changed vertex states into 
messages along the edges.
+        * @param cf The compute function
         * 
         * @param <K> The type of the vertex key (the vertex identifier).
         * @param <VV> The type of the vertex value (the state of the vertex).
         * @param <Message> The type of the message sent between vertices along 
the edges.
         * @param <EV> The type of the values that are associated with the 
edges.
         * 
-        * @return An in stance of the vertex-centric graph computation 
operator.
+        * @return An instance of the vertex-centric graph computation operator.
         */
        public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
                DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
@@ -242,8 +251,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
         * a weight or distance).
         * 
         * @param edgesWithValue The data set containing edges.
-        * @param uf The function that updates the state of the vertices from 
the incoming messages.
-        * @param mf The function that turns changed vertex states into 
messages along the edges.
+        * @param cf The compute function.
         * @param mc The function that combines messages sent to a vertex 
during a superstep.
         * 
         * @param <K> The type of the vertex key (the vertex identifier).
@@ -251,7 +259,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
         * @param <Message> The type of the message sent between vertices along 
the edges.
         * @param <EV> The type of the values that are associated with the 
edges.
         * 
-        * @return An in stance of the vertex-centric graph computation 
operator.
+        * @return An instance of the vertex-centric graph computation operator.
         */
        public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
                DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
@@ -292,16 +300,21 @@ public class VertexCentricIteration<K, VV, EV, Message>
                public void open(Configuration parameters) {
                        outTuple = new Tuple2<K, Either<NullValue, Message>>();
                        nullMessage = Either.Left(NullValue.getInstance());
+                       outTuple.setField(nullMessage, 1);
                }
 
                public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> 
vertex) {
                        outTuple.setField(vertex.getId(), 0);
-                       outTuple.setField(nullMessage, 1);
                        return outTuple;
                }
 }
        
        @SuppressWarnings("serial")
+       /**
+        * This coGroup class wraps the user-defined compute function.
+        * The first input holds a Tuple2 containing the vertex state and its 
inbox.
+        * The second input is an iterator of the out-going edges of this 
vertex.
+        */
        private static class VertexComputeUdf<K, VV, EV, Message> extends 
RichCoGroupFunction<
                Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>,
                Either<Vertex<K, VV>, Tuple2<K, Message>>>
@@ -348,18 +361,17 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
                                final Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>> first = vertexIter.next();
                                final Vertex<K, VV> vertexState = first.f0;
-                               final MessageIterator<Message> messageIter = 
new MessageIterator<Message>();
+                               final MessageIterator<Message> messageIter = 
new MessageIterator<>();
 
                                if 
(getIterationRuntimeContext().getSuperstepNumber() == 1) {
-                                       
+                                       // there are no messages during the 1st 
superstep
                                }
                                else {                          
-                               messageIter.setFirst(first.f1.right());
-
-                               @SuppressWarnings("unchecked")
-                               Iterator<Tuple2<?, Either<NullValue, Message>>> 
downcastIter =
+                                       messageIter.setFirst(first.f1.right());
+                                       @SuppressWarnings("unchecked")
+                                       Iterator<Tuple2<?, Either<NullValue, 
Message>>> downcastIter =
                                                (Iterator<Tuple2<?, 
Either<NullValue, Message>>>) (Iterator<?>) vertexIter;
-                               messageIter.setSource(downcastIter);
+                                       messageIter.setSource(downcastIter);
                                }
 
                                computeFunction.set(vertexState.getId(), 
edgesIterator.iterator(), out);
@@ -371,9 +383,9 @@ public class VertexCentricIteration<K, VV, EV, Message>
        @SuppressWarnings("serial")
        @ForwardedFields("f0")
        public static class MessageCombinerUdf<K, Message> extends 
RichGroupReduceFunction<
-               Tuple2<K, Either<NullValue, Message>>, Tuple2<K, 
Either<NullValue, Message>>>
-               implements ResultTypeQueryable<Tuple2<K, Either<NullValue, 
Message>>>,
-               GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, 
Tuple2<K, Either<NullValue, Message>>> {
+                       Tuple2<K, Either<NullValue, Message>>, Tuple2<K, 
Either<NullValue, Message>>>
+                       implements ResultTypeQueryable<Tuple2<K, 
Either<NullValue, Message>>>,
+                       GroupCombineFunction<Tuple2<K, Either<NullValue, 
Message>>, Tuple2<K, Either<NullValue, Message>>> {
 
                final MessageCombiner<K, Message> combinerFunction;
                private transient TypeInformation<Tuple2<K, Either<NullValue, 
Message>>> resultType;
@@ -400,7 +412,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
                                final Tuple2<K, Either<NullValue, Message>> 
first = messageIterator.next();
                                final K vertexID = first.f0;
-                               final MessageIterator<Message> messageIter = 
new MessageIterator<Message>();
+                               final MessageIterator<Message> messageIter = 
new MessageIterator<>();
                                messageIter.setFirst(first.f1.right());
 
                                @SuppressWarnings("unchecked")
@@ -455,18 +467,18 @@ public class VertexCentricIteration<K, VV, EV, Message>
        @ForwardedFieldsFirst("*->f0")
        @ForwardedFieldsSecond("f1->f1")
        private static final class AppendVertexState<K, VV, Message> implements
-               FlatJoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>,
-               Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> {
+                       JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>,
+                                       Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>> {
 
                private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> 
outTuple =
                                new Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>();
 
-               public void join(Vertex<K, VV> vertex, Tuple2<K, 
Either<NullValue, Message>> message,
-                               Collector<Tuple2<Vertex<K, VV>, 
Either<NullValue, Message>>> out) {
+               public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(
+                               Vertex<K, VV> vertex, Tuple2<K, 
Either<NullValue, Message>> message) {
 
                        outTuple.setField(vertex, 0);
                        outTuple.setField(message.f1, 1);
-                       out.collect(outTuple);
+                       return outTuple;
                }
        }
 
@@ -485,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
        @SuppressWarnings("serial")
        private static final class ProjectMessages<K, VV, Message> implements
-               FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, 
Tuple2<K, Either<NullValue, Message>>> {
+                       FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, 
Message>>, Tuple2<K, Either<NullValue, Message>>> {
 
                private Tuple2<K, Either<NullValue, Message>> outTuple = new 
Tuple2<K, Either<NullValue, Message>>();
 

Reply via email to