Repository: flink Updated Branches: refs/heads/master 35892ed14 -> 048cda72b
http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java new file mode 100644 index 0000000..e647653 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PregelSSSP.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.graph.pregel.ComputeFunction; +import org.apache.flink.graph.pregel.MessageCombiner; +import org.apache.flink.graph.pregel.MessageIterator; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use Gelly's Vertex-Centric iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths} + * and for a gather-sum-apply implementation see {@link GSASingleSourceShortestPaths}. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.examples.data.SingleSourceShortestPathsData} + */ +public class PregelSSSP implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(), env); + + // Execute the vertex-centric iteration + Graph<Long, Double, Double> result = graph.runVertexCentricIteration( + new SSSPComputeFunction(srcVertexId), new SSSPCombiner(), + maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); + env.execute("Pregel Single Source Shortest Paths Example"); + } else { + singleSourceShortestPaths.print(); + } + + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Double> { + + public Double map(Long id) { return Double.POSITIVE_INFINITY; } + } + + /** + * The compute function for SSSP + */ + @SuppressWarnings("serial") + public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> { + + private final long srcId; + + public SSSPComputeFunction(long src) { + this.srcId = src; + } + + public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) { + + double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY; + + for (Double msg : messages) { + minDistance = Math.min(minDistance, msg); + } + + if (minDistance < vertex.getValue()) { + setNewVertexValue(minDistance); + for (Edge<Long, Double> e: getEdges()) { + sendMessageTo(e.getTarget(), minDistance + e.getValue()); + } + } + } + } + + /** + * The messages combiner. + * Out of all messages destined to a target vertex, only the minimum distance is propagated. + */ + @SuppressWarnings("serial") + public static final class SSSPCombiner extends MessageCombiner<Long, Double> { + + public void combineMessages(MessageIterator<Double> messages) { + + double minMessage = Double.POSITIVE_INFINITY; + for (Double msg: messages) { + minMessage = Math.min(minMessage, msg); + } + sendCombinedMessage(minMessage); + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static Long srcVertexId = 1l; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: PregelSSSP <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + srcVertexId = Long.parseLong(args[0]); + edgesInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing Pregel Single Source Shortest Paths example " + + "with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: PregelSSSP <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .fieldDelimiter("\t") + .ignoreComments("%") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "Vertex-centric Single Source Shortest Paths"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java index 258ed16..9019d0b 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java @@ -24,7 +24,7 @@ import com.google.common.io.Files; import org.apache.flink.graph.examples.GSASingleSourceShortestPaths; import org.apache.flink.graph.examples.SingleSourceShortestPaths; import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; -import org.apache.flink.graph.example.PregelSSSP; +import org.apache.flink.graph.examples.PregelSSSP; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.test.util.TestBaseUtils; import org.junit.After; http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 19d6dd3..a5f9f8d 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -31,6 +31,9 @@ import org.apache.flink.{graph => jg} import _root_.scala.collection.JavaConverters._ import _root_.scala.reflect.ClassTag import org.apache.flink.types.NullValue +import org.apache.flink.graph.pregel.ComputeFunction +import org.apache.flink.graph.pregel.MessageCombiner +import org.apache.flink.graph.pregel.VertexCentricConfiguration object Graph { @@ -1105,6 +1108,43 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { maxIterations, parameters)) } + /** + * Runs a vertex-centric iteration on the graph. + * No configuration options are provided. + * + * @param computeFunction the compute function + * @param combineFunction the optional message combiner function + * @param maxIterations maximum number of iterations to perform + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], + combineFunction: MessageCombiner[K, M], + maxIterations: Int): Graph[K, VV, EV] = { + wrapGraph(jgraph.runVertexCentricIteration(computeFunction, combineFunction, + maxIterations)) + } + + /** + * Runs a vertex-centric iteration on the graph with configuration options. + * + * @param computeFunction the compute function + * @param combineFunction the optional message combiner function + * @param maxIterations maximum number of iterations to perform + * @param parameters the iteration configuration parameters + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], + combineFunction: MessageCombiner[K, M], + maxIterations: Int, parameters: VertexCentricConfiguration): + Graph[K, VV, EV] = { + wrapGraph(jgraph.runVertexCentricIteration(computeFunction, combineFunction, + maxIterations, parameters)) + } + def validate(validator: GraphValidator[K, VV, EV]): Boolean = { jgraph.validate(validator) } http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index f2b5b22..d46056a 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1684,10 +1684,10 @@ public class Graph<K, VV, EV> { } /** - * Runs a VertexCentric iteration on the graph. + * Runs a {@link VertexCentricIteration} on the graph. * No configuration options are provided. * - * @param computeFunction the vertex update function + * @param computeFunction the vertex compute function * @param combiner an optional message combiner * @param maximumNumberOfIterations maximum number of iterations to perform * @@ -1702,12 +1702,12 @@ public class Graph<K, VV, EV> { } /** - * Runs a VetexCentric iteration on the graph with configuration options. + * Runs a {@link VertexCentricIteration} on the graph with configuration options. * - * @param computeFunction the vertex update function + * @param computeFunction the vertex compute function * @param combiner an optional message combiner * @param maximumNumberOfIterations maximum number of iterations to perform - * @param parameters the iteration configuration parameters + * @param parameters the {@link VertexCentricConfiguration} parameters * * @return the updated Graph after the vertex-centric iteration has converged or * after maximumNumberOfIterations. http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java deleted file mode 100644 index 93cc360..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.graph.pregel.ComputeFunction; -import org.apache.flink.graph.pregel.MessageCombiner; -import org.apache.flink.graph.pregel.MessageIterator; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; - -/** - * This example shows how to use Gelly's Vertex-Centric iterations. - * - * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths} - * and for a gather-sum-apply implementation see {@link GSASingleSourceShortestPaths}. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, - * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. - * - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData} - */ -public class PregelSSSP implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(), env); - - // Execute the vertex-centric iteration - Graph<Long, Double, Double> result = graph.runVertexCentricIteration( - new SSSPComputeFunction(srcVertexId), new SSSPCombiner(), - maxIterations); - - // Extract the vertices as the result - DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); - - // emit result - if (fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); - env.execute("Pregel Single Source Shortest Paths Example"); - } else { - singleSourceShortestPaths.print(); - } - - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction<Long, Double> { - - public Double map(Long id) { return Double.POSITIVE_INFINITY; } - } - - /** - * The compute function for SSSP - */ - @SuppressWarnings("serial") - public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> { - - private final long srcId; - - public SSSPComputeFunction(long src) { - this.srcId = src; - } - - public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) { - - double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY; - - for (Double msg : messages) { - minDistance = Math.min(minDistance, msg); - } - - if (minDistance < vertex.getValue()) { - setNewVertexValue(minDistance); - for (Edge<Long, Double> e: getEdges()) { - sendMessageTo(e.getTarget(), minDistance + e.getValue()); - } - } - } - } - - /** - * The messages combiner. - * Out of all messages destined to a target vertex, only the minimum distance is propagated. - */ - @SuppressWarnings("serial") - public static final class SSSPCombiner extends MessageCombiner<Long, Double> { - - public void combineMessages(MessageIterator<Double> messages) { - - double minMessage = Double.POSITIVE_INFINITY; - for (Double msg: messages) { - minMessage = Math.min(minMessage, msg); - } - sendCombinedMessage(minMessage); - } - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static Long srcVertexId = 1l; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: PregelSSSP <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - return false; - } - - fileOutput = true; - srcVertexId = Long.parseLong(args[0]); - edgesInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); - } else { - System.out.println("Executing Pregel Single Source Shortest Paths example " - + "with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: PregelSSSP <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - } - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .fieldDelimiter("\t") - .ignoreComments("%") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap<Long, Double>()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); - } - } - - @Override - public String getDescription() { - return "Vertex-centric Single Source Shortest Paths"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java index 09a000f..db64f63 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java @@ -24,11 +24,12 @@ import java.util.Iterator; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.Either; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Vertex; +import org.apache.flink.types.Either; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; @@ -84,30 +85,20 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl * @return An iterator with all edges. */ public final Iterable<Edge<K, EV>> getEdges() { - if (edgesUsed) { - throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); - } - edgesUsed = true; + verifyEdgeUsage(); this.edgeIterator.set((Iterator<Edge<K, EV>>) edges); return this.edgeIterator; } /** - * Sends the given message to all vertices that are targets of an edge of the changed vertex. + * Sends the given message to all vertices that adjacent to the changed vertex. * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once. * * @param m The message to send. */ public final void sendMessageToAllNeighbors(Message m) { - if (edgesUsed) { - throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'" - + "exactly once."); - } - - edgesUsed = true; - + verifyEdgeUsage(); outMsg.setField(m, 1); - while (edges.hasNext()) { Tuple next = (Tuple) edges.next(); outMsg.setField(next.getField(1), 0); @@ -163,7 +154,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl * all aggregates globally once per superstep and makes them available in the next superstep. * * @param name The name of the aggregator. - * @return The aggregator registered under this name, or null, if no aggregator was registered. + * @return The aggregator registered under this name, or {@code null}, if no aggregator was registered. */ public final <T extends Aggregator<?>> T getIterationAggregator(String name) { return this.runtimeContext.<T>getIterationAggregator(name); @@ -182,7 +173,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl /** * Gets the broadcast data set registered under the given name. Broadcast data sets * are available on all parallel instances of a function. They can be registered via - * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSet(String, org.apache.flink.api.java.DataSet)}. + * {@link org.apache.flink.graph.pregel.VertexCentricConfiguration#addBroadcastSet(String, DataSet)}. * * @param name The name under which the broadcast set is registered. * @return The broadcast data set. @@ -228,14 +219,22 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl this.edgesUsed = false; setNewVertexValueCalled = false; } - + + private void verifyEdgeUsage() throws IllegalStateException { + if (edgesUsed) { + throw new IllegalStateException( + "Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); + } + edgesUsed = true; + } + private static final class EdgesIterator<K, EV> implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> { private Iterator<Edge<K, EV>> input; - private Edge<K, EV> edge = new Edge<K, EV>(); - + private Edge<K, EV> edge = new Edge<>(); + void set(Iterator<Edge<K, EV>> input) { this.input = input; } http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java index 9458323..70c8262 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.pregel; import java.io.Serializable; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.types.Either; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; @@ -49,7 +49,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable { } /** - * Combines messages from sent different vertices to a target vertex. + * Combines messages sent from different vertices to a target vertex. * Implementing this method might reduce communication costs during a vertex-centric * iteration. * http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java index 620cb93..5a17b90 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java @@ -21,7 +21,7 @@ package org.apache.flink.graph.pregel; import java.util.Iterator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.types.Either; import org.apache.flink.types.NullValue; /** @@ -47,11 +47,8 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab if (first != null) { return true; } - else if (this.source != null) { - return this.source.hasNext(); - } else { - return false; + return ((this.source != null) && (this.source.hasNext())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java index 287f20d..026e869 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java @@ -44,7 +44,7 @@ public class VertexCentricConfiguration extends IterationConfiguration { /** * Adds a data set as a broadcast set to the compute function. * - * @param name The name under which the broadcast data is available in the compute function. + * @param name The name under which the broadcast data set is available in the compute function. * @param data The data set to be broadcasted. */ public void addBroadcastSet(String name, DataSet<?> data) { @@ -55,7 +55,7 @@ public class VertexCentricConfiguration extends IterationConfiguration { * Get the broadcast variables of the compute function. * * @return a List of Tuple2, where the first field is the broadcast variable name - * and the second field is the broadcast DataSet. + * and the second field is the broadcast data set. */ public List<Tuple2<String, DataSet<?>>> getBcastVars() { return this.bcVars; http://git-wip-us.apache.org/repos/asf/flink/blob/048cda72/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java index c79ace7..9434221 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java @@ -22,22 +22,21 @@ import java.util.Iterator; import java.util.Map; import org.apache.flink.api.common.aggregators.Aggregator; -import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.Either; import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -46,6 +45,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.types.Either; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; @@ -70,8 +70,8 @@ import com.google.common.base.Preconditions; * </ul> * <p> * - * Vertex-centric graph iterations are are run by calling - * {@link Graph#runVertexCentricIteration(ComputeFunction, int)}. + * Vertex-centric graph iterations are run by calling + * {@link Graph#runVertexCentricIteration(ComputeFunction, MessageCombiner, int)}. * * @param <K> The type of the vertex key (the vertex identifier). * @param <VV> The type of the vertex value (the state of the vertex). @@ -137,6 +137,16 @@ public class VertexCentricIteration<K, VV, EV, Message> /** * Creates the operator that represents this vertex-centric graph computation. + * <p> + * The Pregel iteration is mapped to delta iteration as follows. + * The solution set consists of the set of active vertices and the workset contains the set of messages + * send to vertices during the previous superstep. Initially, the workset contains a null message for each vertex. + * In the beginning of a superstep, the solution set is joined with the workset to produce + * a dataset containing tuples of vertex state and messages (vertex inbox). + * The superstep compute UDF is realized with a coGroup between the vertices with inbox and the graph edges. + * The output of the compute UDF contains both the new vertex values and the new messages produced. + * These are directed to the solution set delta and new workset, respectively, with subsequent flatMaps. + * <p/> * * @return The operator that represents this vertex-centric graph computation. */ @@ -163,7 +173,7 @@ public class VertexCentricIteration<K, VV, EV, Message> final DeltaIteration<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>> iteration = initialVertices.iterateDelta(initialWorkSet, this.maximumNumberOfIterations, 0); - setUpIteration(iteration); + setUpIteration(iteration); // join with the current state to get vertex values DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs = @@ -219,15 +229,14 @@ public class VertexCentricIteration<K, VV, EV, Message> * Creates a new vertex-centric iteration operator. * * @param edgesWithValue The data set containing edges. - * @param uf The function that updates the state of the vertices from the incoming messages. - * @param mf The function that turns changed vertex states into messages along the edges. + * @param cf The compute function * * @param <K> The type of the vertex key (the vertex identifier). * @param <VV> The type of the vertex value (the state of the vertex). * @param <Message> The type of the message sent between vertices along the edges. * @param <EV> The type of the values that are associated with the edges. * - * @return An in stance of the vertex-centric graph computation operator. + * @return An instance of the vertex-centric graph computation operator. */ public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, @@ -242,8 +251,7 @@ public class VertexCentricIteration<K, VV, EV, Message> * a weight or distance). * * @param edgesWithValue The data set containing edges. - * @param uf The function that updates the state of the vertices from the incoming messages. - * @param mf The function that turns changed vertex states into messages along the edges. + * @param cf The compute function. * @param mc The function that combines messages sent to a vertex during a superstep. * * @param <K> The type of the vertex key (the vertex identifier). @@ -251,7 +259,7 @@ public class VertexCentricIteration<K, VV, EV, Message> * @param <Message> The type of the message sent between vertices along the edges. * @param <EV> The type of the values that are associated with the edges. * - * @return An in stance of the vertex-centric graph computation operator. + * @return An instance of the vertex-centric graph computation operator. */ public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, @@ -292,16 +300,21 @@ public class VertexCentricIteration<K, VV, EV, Message> public void open(Configuration parameters) { outTuple = new Tuple2<K, Either<NullValue, Message>>(); nullMessage = Either.Left(NullValue.getInstance()); + outTuple.setField(nullMessage, 1); } public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) { outTuple.setField(vertex.getId(), 0); - outTuple.setField(nullMessage, 1); return outTuple; } } @SuppressWarnings("serial") + /** + * This coGroup class wraps the user-defined compute function. + * The first input holds a Tuple2 containing the vertex state and its inbox. + * The second input is an iterator of the out-going edges of this vertex. + */ private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction< Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>, Either<Vertex<K, VV>, Tuple2<K, Message>>> @@ -348,18 +361,17 @@ public class VertexCentricIteration<K, VV, EV, Message> final Tuple2<Vertex<K, VV>, Either<NullValue, Message>> first = vertexIter.next(); final Vertex<K, VV> vertexState = first.f0; - final MessageIterator<Message> messageIter = new MessageIterator<Message>(); + final MessageIterator<Message> messageIter = new MessageIterator<>(); if (getIterationRuntimeContext().getSuperstepNumber() == 1) { - + // there are no messages during the 1st superstep } else { - messageIter.setFirst(first.f1.right()); - - @SuppressWarnings("unchecked") - Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter = + messageIter.setFirst(first.f1.right()); + @SuppressWarnings("unchecked") + Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter = (Iterator<Tuple2<?, Either<NullValue, Message>>>) (Iterator<?>) vertexIter; - messageIter.setSource(downcastIter); + messageIter.setSource(downcastIter); } computeFunction.set(vertexState.getId(), edgesIterator.iterator(), out); @@ -371,9 +383,9 @@ public class VertexCentricIteration<K, VV, EV, Message> @SuppressWarnings("serial") @ForwardedFields("f0") public static class MessageCombinerUdf<K, Message> extends RichGroupReduceFunction< - Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> - implements ResultTypeQueryable<Tuple2<K, Either<NullValue, Message>>>, - GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> { + Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> + implements ResultTypeQueryable<Tuple2<K, Either<NullValue, Message>>>, + GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> { final MessageCombiner<K, Message> combinerFunction; private transient TypeInformation<Tuple2<K, Either<NullValue, Message>>> resultType; @@ -400,7 +412,7 @@ public class VertexCentricIteration<K, VV, EV, Message> final Tuple2<K, Either<NullValue, Message>> first = messageIterator.next(); final K vertexID = first.f0; - final MessageIterator<Message> messageIter = new MessageIterator<Message>(); + final MessageIterator<Message> messageIter = new MessageIterator<>(); messageIter.setFirst(first.f1.right()); @SuppressWarnings("unchecked") @@ -455,18 +467,18 @@ public class VertexCentricIteration<K, VV, EV, Message> @ForwardedFieldsFirst("*->f0") @ForwardedFieldsSecond("f1->f1") private static final class AppendVertexState<K, VV, Message> implements - FlatJoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>, - Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> { + JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>, + Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> { private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = new Tuple2<Vertex<K, VV>, Either<NullValue, Message>>(); - public void join(Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message, - Collector<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> out) { + public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join( + Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) { outTuple.setField(vertex, 0); outTuple.setField(message.f1, 1); - out.collect(outTuple); + return outTuple; } } @@ -485,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message> @SuppressWarnings("serial") private static final class ProjectMessages<K, VV, Message> implements - FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> { + FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> { private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<K, Either<NullValue, Message>>();
