Repository: flink Updated Branches: refs/heads/master f9eea5e5a -> f2186a604
[FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP and how to use a library method (connected components). This closes #1211 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186a60 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186a60 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186a60 Branch: refs/heads/master Commit: f2186a604f407e7b6db534cf6f9e50e27eac765a Parents: f9eea5e Author: vasia <[email protected]> Authored: Thu Oct 1 22:26:25 2015 +0200 Committer: vasia <[email protected]> Committed: Wed Oct 7 22:37:08 2015 +0200 ---------------------------------------------------------------------- docs/libs/gelly_guide.md | 4 +- .../org/apache/flink/graph/scala/Graph.scala | 16 +- .../scala/example/ConnectedComponents.scala | 121 +++++++++++++ .../example/GSASingleSourceShortestPaths.scala | 156 +++++++++++++++++ .../graph/scala/example/GraphMetrics.scala | 19 +-- .../example/SingleSourceShortestPaths.scala | 170 +++++++++++++++++++ .../graph/example/ConnectedComponents.java | 2 +- .../utils/ConnectedComponentsDefaultData.java | 21 ++- .../utils/SingleSourceShortestPathsData.java | 37 ++-- 9 files changed, 501 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/docs/libs/gelly_guide.md ---------------------------------------------------------------------- diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index fa2c86c..766b395 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -265,10 +265,10 @@ If no vertex input is provided during Graph creation, Gelly will automatically p val env = ExecutionEnvironment.getExecutionEnvironment // initialize the vertex value to be equal to the vertex ID -val graph = Graph.fromCollection(edgeList, env, +val graph = Graph.fromCollection(edgeList, new MapFunction[Long, Long] { def map(id: Long): Long = id - }) + }, env) {% endhighlight %} </div> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 38702f3..28f3f12 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -57,8 +57,8 @@ object Graph { * map function to the vertex ids. */ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: - TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment, - mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV], + env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv)) } @@ -87,8 +87,8 @@ object Graph { * map function to the vertex ids. */ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: - TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment, - mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV], + env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv)) } @@ -120,8 +120,8 @@ object Graph { * map function to the vertex ids. */ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: - TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment, - mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV], + env: ExecutionEnvironment): Graph[K, VV, EV] = { val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv)) } @@ -230,7 +230,7 @@ object Graph { // initializer provided if (mapper != null) { - fromTupleDataSet[K, VV, EV](edges, env, mapper) + fromTupleDataSet[K, VV, EV](edges, mapper, env) } else { fromTupleDataSet[K, EV](edges, env) @@ -244,7 +244,7 @@ object Graph { // no initializer provided if (mapper != null) { - fromTupleDataSet[K, VV, NullValue](edges, env, mapper) + fromTupleDataSet[K, VV, NullValue](edges, mapper, env) } else { fromTupleDataSet[K, NullValue](edges, env) http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala new file mode 100644 index 0000000..b3da520 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.types.NullValue +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.library.GSAConnectedComponents +import java.lang.Long + +/** + * This example shows how to use Gelly's library methods. + * You can find all available library methods in [[org.apache.flink.graph.library]]. + * + * In particular, this example uses the + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]] + * library method to compute the connected components of the input graph. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\n1\t3\n</code> defines two edges, + * 1-2 and 1-3. + * + * Usage {{ + * ConnectedComponents <edge path> <result path> <number of iterations> + * }} + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]] + */ +object ConnectedComponents { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env) + + val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations)) + + + // emit result + if (fileOutput) { + components.writeAsCsv(outputPath, "\n", ",") + env.execute("Connected Components Example") + } else { + components.print() + } + } + + private final class InitVertices extends MapFunction[Long, Long] { + override def map(id: Long) = {id} + } + + // *********************************************************************** + // UTIL METHODS + // *********************************************************************** + + private var fileOutput = false + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>") + false + } + fileOutput = true + edgesInputPath = args(0) + outputPath = args(1) + maxIterations = (2).toInt + } else { + System.out.println("Executing ConnectedComponents example with default parameters" + + " and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) + } else { + val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData).map( + edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala new file mode 100644 index 0000000..2dc272c --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.api.common.functions.MapFunction +import scala.collection.JavaConversions._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData +import org.apache.flink.graph.gsa.GatherFunction +import org.apache.flink.graph.gsa.Neighbor +import org.apache.flink.graph.gsa.SumFunction +import org.apache.flink.graph.gsa.ApplyFunction + +/** + * This example shows how to use Gelly's gather-sum-apply iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] + */ +object GSASingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the gather-sum-apply iteration + val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, + new UpdateDistance, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("GSA Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + private final class CalculateDistances extends GatherFunction[Double, Double, Double] { + override def gather(neighbor: Neighbor[Double, Double]) = { + neighbor.getNeighborValue + neighbor.getEdgeValue + } + } + + private final class ChooseMinDistance extends SumFunction[Double, Double, Double] { + override def sum(newValue: Double, currentValue: Double) = { + Math.min(newValue, currentValue) + } + } + + private final class UpdateDistance extends ApplyFunction[Long, Double, Double] { + override def apply(newDistance: Double, oldDistance: Double) = { + if (newDistance < oldDistance) { + setResult(newDistance) + } + } + } + + // ************************************************************************** + // UTIL METHODS + // ************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + false + } + fileOutput = true + srcVertexId = args(0).toLong + edgesInputPath = args(1) + outputPath = args(2) + maxIterations = (3).toInt + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long, Double)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(new Tuple3ToEdgeMap[Long, Double]()) + } else { + val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { + case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], + z.asInstanceOf[Double]) + } + env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala index 68d9285..4eed824 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala @@ -106,21 +106,20 @@ object GraphMetrics { private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { if (fileOutput) { env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = "\t").map( - in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) - } - else { + edgesPath, + fieldDelimiter = "\t").map( + in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) + } else { env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( - (key: Long, out: Collector[Edge[Long, NullValue]]) => { - val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt + (key: Long, out: Collector[Edge[Long, NullValue]]) => { + val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt for ( i <- 0 to numOutEdges ) { var target: Long = ((Math.random() * numVertices) + 1).toLong - new Edge[Long, NullValue](key, target, NullValue.getInstance()) + new Edge[Long, NullValue](key, target, NullValue.getInstance()) } - }) - } + }) } + } private var fileOutput: Boolean = false private var edgesPath: String = null http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala new file mode 100644 index 0000000..65a8e7f --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.spargel.VertexUpdateFunction +import org.apache.flink.graph.spargel.MessageIterator +import org.apache.flink.graph.Vertex +import org.apache.flink.graph.spargel.MessagingFunction +import scala.collection.JavaConversions._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData + +/** + * This example shows how to use Gelly's vertex-centric iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] + */ +object SingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the vertex-centric iteration + val result = graph.runVertexCentricIteration(new VertexDistanceUpdater, + new MinDistanceMessenger, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + */ + private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { + + override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { + var minDistance = Double.MaxValue + while (inMessages.hasNext) { + var msg = inMessages.next + if (msg < minDistance) { + minDistance = msg + } + } + if (vertex.getValue > minDistance) { + setNewVertexValue(minDistance) + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + */ + private final class MinDistanceMessenger extends + MessagingFunction[Long, Double, Double, Double] { + + override def sendMessages(vertex: Vertex[Long, Double]) { + for (edge: Edge[Long, Double] <- getEdges) { + sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue) + } + } + } + + // **************************************************************************** + // UTIL METHODS + // **************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + false + } + fileOutput = true + srcVertexId = args(0).toLong + edgesInputPath = args(1) + outputPath = args(2) + maxIterations = (3).toInt + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long, Double)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(new Tuple3ToEdgeMap[Long, Double]()) + } else { + val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { + case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], + z.asInstanceOf[Double]) + } + env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index 4189602..cd52e04 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue; * This example shows how to use Gelly's library methods. * You can find all available library methods in {@link org.apache.flink.graph.library}. * - * In particular, this example uses the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm} + * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents} * library method to compute the connected components of the input graph. * * The input file is a plain text file and must be formatted as follows: http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java index b9556a9..67864eb 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java @@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.types.NullValue; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; /** @@ -36,14 +36,19 @@ public class ConnectedComponentsDefaultData { public static final String EDGES = "1 2\n" + "2 3\n" + "2 4\n" + "3 4"; - public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); - edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance())); - edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance())); + public static final Object[][] DEFAULT_EDGES = new Object[][] { + new Object[]{1L, 2L}, + new Object[]{2L, 3L}, + new Object[]{2L, 4L}, + new Object[]{3L, 4L} + }; - return env.fromCollection(edges); + public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>(); + for (Object[] edge : DEFAULT_EDGES) { + edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance())); + } + return env.fromCollection(edgeList); } public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1"; http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java index cf0034a..6b985c5 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java @@ -18,13 +18,13 @@ package org.apache.flink.graph.example.utils; +import java.util.LinkedList; +import java.util.List; + import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; -import java.util.ArrayList; -import java.util.List; - /** * Provides the default data set used for the Single Source Shortest Paths example program. * If no parameters are given to the program, the default edge data set is used. @@ -36,22 +36,27 @@ public class SingleSourceShortestPathsData { public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" + "4\t5\t45.0\n" + "5\t1\t51.0"; - public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); - edges.add(new Edge<Long, Double>(1L, 2L, 12.0)); - edges.add(new Edge<Long, Double>(1L, 3L, 13.0)); - edges.add(new Edge<Long, Double>(2L, 3L, 23.0)); - edges.add(new Edge<Long, Double>(3L, 4L, 34.0)); - edges.add(new Edge<Long, Double>(3L, 5L, 35.0)); - edges.add(new Edge<Long, Double>(4L, 5L, 45.0)); - edges.add(new Edge<Long, Double>(5L, 1L, 51.0)); - - return env.fromCollection(edges); - } + public static final Object[][] DEFAULT_EDGES = new Object[][] { + new Object[]{1L, 2L, 12.0}, + new Object[]{1L, 3L, 13.0}, + new Object[]{2L, 3L, 23.0}, + new Object[]{3L, 4L, 34.0}, + new Object[]{3L, 5L, 35.0}, + new Object[]{4L, 5L, 45.0}, + new Object[]{5L, 1L, 51.0} + }; public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + "4,47.0\n" + "5,48.0"; + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>(); + for (Object[] edge : DEFAULT_EDGES) { + edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2])); + } + return env.fromCollection(edgeList); + } + private SingleSourceShortestPathsData() {} }
