[FLINK-3207] [gelly] add a pregel SSSP example with combiner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77eb4f0c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77eb4f0c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77eb4f0c Branch: refs/heads/master Commit: 77eb4f0c41fe45d3ad4efc8c54f21394cd973cb5 Parents: c5ffb5d Author: vasia <[email protected]> Authored: Tue Feb 2 12:41:51 2016 +0100 Committer: vasia <[email protected]> Committed: Mon Mar 21 19:10:29 2016 +0100 ---------------------------------------------------------------------- .../examples/GSASingleSourceShortestPaths.java | 3 +- .../SingleSourceShortestPathsITCase.java | 8 + .../main/java/org/apache/flink/graph/Graph.java | 45 +++++ .../apache/flink/graph/example/PregelSSSP.java | 194 +++++++++++++++++++ 4 files changed, 249 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java index 1732016..35f07b0 100755 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java @@ -36,7 +36,8 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap; * This example shows how to use Gelly's Gather-Sum-Apply iterations. * * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. + * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths} + * and for a vertex-centric implementation, see {@link PregelSSSP}. * * The input file is a plain text file and must be formatted as follows: * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java index faf92c0..258ed16 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java @@ -24,6 +24,7 @@ import com.google.common.io.Files; import org.apache.flink.graph.examples.GSASingleSourceShortestPaths; import org.apache.flink.graph.examples.SingleSourceShortestPaths; import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.graph.example.PregelSSSP; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.test.util.TestBaseUtils; import org.junit.After; @@ -75,6 +76,13 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase { expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; } + @Test + public void testPregelSSSPExample() throws Exception { + PregelSSSP.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "", + edgesPath, resultPath, 10 + ""}); + expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; + } + @After public void after() throws Exception { TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index ce8e895..f2b5b22 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -51,6 +51,10 @@ import org.apache.flink.graph.gsa.GSAConfiguration; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherSumApplyIteration; import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.pregel.ComputeFunction; +import org.apache.flink.graph.pregel.MessageCombiner; +import org.apache.flink.graph.pregel.VertexCentricConfiguration; +import org.apache.flink.graph.pregel.VertexCentricIteration; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.ScatterGatherIteration; @@ -1680,6 +1684,47 @@ public class Graph<K, VV, EV> { } /** + * Runs a VertexCentric iteration on the graph. + * No configuration options are provided. + * + * @param computeFunction the vertex update function + * @param combiner an optional message combiner + * @param maximumNumberOfIterations maximum number of iterations to perform + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + public <M> Graph<K, VV, EV> runVertexCentricIteration( + ComputeFunction<K, VV, EV, M> computeFunction, + MessageCombiner<K, M> combiner, int maximumNumberOfIterations) { + + return this.runVertexCentricIteration(computeFunction, combiner, maximumNumberOfIterations, null); + } + + /** + * Runs a VetexCentric iteration on the graph with configuration options. + * + * @param computeFunction the vertex update function + * @param combiner an optional message combiner + * @param maximumNumberOfIterations maximum number of iterations to perform + * @param parameters the iteration configuration parameters + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + public <M> Graph<K, VV, EV> runVertexCentricIteration( + ComputeFunction<K, VV, EV, M> computeFunction, + MessageCombiner<K, M> combiner, int maximumNumberOfIterations, + VertexCentricConfiguration parameters) { + + VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges( + edges, computeFunction, maximumNumberOfIterations); + iteration.configure(parameters); + DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration); + return new Graph<K, VV, EV>(newVertices, this.edges, this.context); + } + + /** * @param algorithm the algorithm to run on the Graph * @param <T> the return type * @return the result of the graph algorithm http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java new file mode 100644 index 0000000..93cc360 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; +import org.apache.flink.graph.pregel.ComputeFunction; +import org.apache.flink.graph.pregel.MessageCombiner; +import org.apache.flink.graph.pregel.MessageIterator; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use Gelly's Vertex-Centric iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths} + * and for a gather-sum-apply implementation see {@link GSASingleSourceShortestPaths}. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData} + */ +public class PregelSSSP implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(), env); + + // Execute the vertex-centric iteration + Graph<Long, Double, Double> result = graph.runVertexCentricIteration( + new SSSPComputeFunction(srcVertexId), new SSSPCombiner(), + maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); + env.execute("Pregel Single Source Shortest Paths Example"); + } else { + singleSourceShortestPaths.print(); + } + + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Double> { + + public Double map(Long id) { return Double.POSITIVE_INFINITY; } + } + + /** + * The compute function for SSSP + */ + @SuppressWarnings("serial") + public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> { + + private final long srcId; + + public SSSPComputeFunction(long src) { + this.srcId = src; + } + + public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) { + + double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY; + + for (Double msg : messages) { + minDistance = Math.min(minDistance, msg); + } + + if (minDistance < vertex.getValue()) { + setNewVertexValue(minDistance); + for (Edge<Long, Double> e: getEdges()) { + sendMessageTo(e.getTarget(), minDistance + e.getValue()); + } + } + } + } + + /** + * The messages combiner. + * Out of all messages destined to a target vertex, only the minimum distance is propagated. + */ + @SuppressWarnings("serial") + public static final class SSSPCombiner extends MessageCombiner<Long, Double> { + + public void combineMessages(MessageIterator<Double> messages) { + + double minMessage = Double.POSITIVE_INFINITY; + for (Double msg: messages) { + minMessage = Math.min(minMessage, msg); + } + sendCombinedMessage(minMessage); + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static Long srcVertexId = 1l; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: PregelSSSP <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + srcVertexId = Long.parseLong(args[0]); + edgesInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing Pregel Single Source Shortest Paths example " + + "with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: PregelSSSP <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .fieldDelimiter("\t") + .ignoreComments("%") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "Vertex-centric Single Source Shortest Paths"; + } +} \ No newline at end of file
