[FLINK-3207] [gelly] add a pregel SSSP example with combiner

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77eb4f0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77eb4f0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77eb4f0c

Branch: refs/heads/master
Commit: 77eb4f0c41fe45d3ad4efc8c54f21394cd973cb5
Parents: c5ffb5d
Author: vasia <[email protected]>
Authored: Tue Feb 2 12:41:51 2016 +0100
Committer: vasia <[email protected]>
Committed: Mon Mar 21 19:10:29 2016 +0100

----------------------------------------------------------------------
 .../examples/GSASingleSourceShortestPaths.java  |   3 +-
 .../SingleSourceShortestPathsITCase.java        |   8 +
 .../main/java/org/apache/flink/graph/Graph.java |  45 +++++
 .../apache/flink/graph/example/PregelSSSP.java  | 194 +++++++++++++++++++
 4 files changed, 249 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
index 1732016..35f07b0 100755
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -36,7 +36,8 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
  * This example shows how to use Gelly's Gather-Sum-Apply iterations.
  * 
  * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a vertex-centric implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}. 
+ * For a scatter-gather implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}
+ * and for a vertex-centric implementation, see {@link PregelSSSP}. 
  *
  * The input file is a plain text file and must be formatted as follows:
  * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
index faf92c0..258ed16 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
@@ -24,6 +24,7 @@ import com.google.common.io.Files;
 import org.apache.flink.graph.examples.GSASingleSourceShortestPaths;
 import org.apache.flink.graph.examples.SingleSourceShortestPaths;
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.graph.example.PregelSSSP;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.junit.After;
@@ -75,6 +76,13 @@ public class SingleSourceShortestPathsITCase extends 
MultipleProgramsTestBase {
         expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }
 
+    @Test
+    public void testPregelSSSPExample() throws Exception {
+        PregelSSSP.main(new 
String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
     @After
     public void after() throws Exception {
         TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index ce8e895..f2b5b22 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -51,6 +51,10 @@ import org.apache.flink.graph.gsa.GSAConfiguration;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.pregel.ComputeFunction;
+import org.apache.flink.graph.pregel.MessageCombiner;
+import org.apache.flink.graph.pregel.VertexCentricConfiguration;
+import org.apache.flink.graph.pregel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
@@ -1680,6 +1684,47 @@ public class Graph<K, VV, EV> {
        }
 
        /**
+        * Runs a VertexCentric iteration on the graph.
+        * No configuration options are provided.
+        *
+        * @param computeFunction the vertex update function
+        * @param combiner an optional message combiner
+        * @param maximumNumberOfIterations maximum number of iterations to 
perform
+        * 
+        * @return the updated Graph after the vertex-centric iteration has 
converged or
+        * after maximumNumberOfIterations.
+        */
+       public <M> Graph<K, VV, EV> runVertexCentricIteration(
+                       ComputeFunction<K, VV, EV, M> computeFunction, 
+                       MessageCombiner<K, M> combiner, int 
maximumNumberOfIterations) {
+
+               return this.runVertexCentricIteration(computeFunction, 
combiner, maximumNumberOfIterations, null);
+       }
+
+       /**
+        * Runs a VetexCentric iteration on the graph with configuration 
options.
+        * 
+        * @param computeFunction the vertex update function
+        * @param combiner an optional message combiner
+        * @param maximumNumberOfIterations maximum number of iterations to 
perform
+        * @param parameters the iteration configuration parameters
+        * 
+        * @return the updated Graph after the vertex-centric iteration has 
converged or
+        * after maximumNumberOfIterations.
+        */
+       public <M> Graph<K, VV, EV> runVertexCentricIteration(
+                       ComputeFunction<K, VV, EV, M> computeFunction,
+                       MessageCombiner<K, M> combiner, int 
maximumNumberOfIterations,
+                       VertexCentricConfiguration parameters) {
+
+               VertexCentricIteration<K, VV, EV, M> iteration = 
VertexCentricIteration.withEdges(
+                               edges, computeFunction, 
maximumNumberOfIterations);
+               iteration.configure(parameters);
+               DataSet<Vertex<K, VV>> newVertices = 
this.getVertices().runOperation(iteration);
+               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
+       }
+
+       /**
         * @param algorithm the algorithm to run on the Graph
         * @param <T> the return type
         * @return the result of the graph algorithm

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
new file mode 100644
index 0000000..93cc360
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.pregel.ComputeFunction;
+import org.apache.flink.graph.pregel.MessageCombiner;
+import org.apache.flink.graph.pregel.MessageIterator;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's Vertex-Centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a scatter-gather implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}
+ * and for a gather-sum-apply implementation see {@link 
GSASingleSourceShortestPaths}.  
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
+ */
+public class PregelSSSP implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
+
+               // Execute the vertex-centric iteration
+               Graph<Long, Double, Double> result = 
graph.runVertexCentricIteration(
+                               new SSSPComputeFunction(srcVertexId), new 
SSSPCombiner(), 
+                               maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
+                       env.execute("Pregel Single Source Shortest Paths 
Example");
+               } else {
+                       singleSourceShortestPaths.print();
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Double> {
+
+               public Double map(Long id) { return Double.POSITIVE_INFINITY; }
+       }
+
+       /**
+        * The compute function for SSSP
+        */
+       @SuppressWarnings("serial")
+       public static final class SSSPComputeFunction extends 
ComputeFunction<Long, Double, Double, Double> {
+
+               private final long srcId;
+
+               public SSSPComputeFunction(long src) {
+                       this.srcId = src;
+               }
+
+               public void compute(Vertex<Long, Double> vertex, 
MessageIterator<Double> messages) {
+
+                       double minDistance = (vertex.getId().equals(srcId)) ? 
0d : Double.POSITIVE_INFINITY;
+
+                       for (Double msg : messages) {
+                               minDistance = Math.min(minDistance, msg);
+                       }
+
+                       if (minDistance < vertex.getValue()) {
+                               setNewVertexValue(minDistance);
+                               for (Edge<Long, Double> e: getEdges()) {
+                                       sendMessageTo(e.getTarget(), 
minDistance + e.getValue());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * The messages combiner.
+        * Out of all messages destined to a target vertex, only the minimum 
distance is propagated.
+        */
+       @SuppressWarnings("serial")
+       public static final class SSSPCombiner extends MessageCombiner<Long, 
Double> {
+
+               public void combineMessages(MessageIterator<Double> messages) {
+
+                       double minMessage = Double.POSITIVE_INFINITY;
+                       for (Double msg: messages) {
+                               minMessage = Math.min(minMessage, msg);
+                       }
+                       sendCombinedMessage(minMessage);
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static Long srcVertexId = 1l;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: PregelSSSP <source 
vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       srcVertexId = Long.parseLong(args[0]);
+                       edgesInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                               System.out.println("Executing Pregel Single 
Source Shortest Paths example "
+                                               + "with default parameters and 
built-in default data.");
+                               System.out.println("  Provide parameters to 
read input data from files.");
+                               System.out.println("  See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: PregelSSSP <source 
vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .lineDelimiter("\n")
+                                       .fieldDelimiter("\t")
+                                       .ignoreComments("%")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "Vertex-centric Single Source Shortest Paths";
+       }
+}
\ No newline at end of file

Reply via email to