[FLINK-2561] [gelly] add missing methods to Graph: add-remove edges/vertices, difference, graph creation methods, validate, getTriplets. Add missing utility mappers.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b4dc067 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b4dc067 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b4dc067 Branch: refs/heads/master Commit: 0b4dc067fee82654fae3292bf6bf7d59157bf5c0 Parents: 0f17755 Author: vasia <[email protected]> Authored: Thu Sep 24 22:08:10 2015 +0200 Committer: vasia <[email protected]> Committed: Tue Sep 29 00:38:20 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/graph/scala/Graph.scala | 151 ++++++++++++++++++- .../graph/scala/utils/Tuple2ToVertexMap.scala | 31 ++++ .../graph/scala/utils/Tuple3ToEdgeMap.scala | 31 ++++ 3 files changed, 212 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 73e175e..ed58ffd 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -23,26 +23,108 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{tuple => jtuple} import org.apache.flink.api.scala._ import org.apache.flink.graph._ +import org.apache.flink.graph.validation.GraphValidator import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction} import org.apache.flink.{graph => jg} - import _root_.scala.collection.JavaConverters._ import _root_.scala.reflect.ClassTag +import org.apache.flink.types.NullValue object Graph { + + /** + * Creates a Graph from a DataSet of vertices and a DataSet of edges. + */ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv)) } + /** + * Creates a Graph from a DataSet of edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv)) + } + + /** + * Creates a graph from a DataSet of edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv)) + } + + /** + * Creates a Graph from a Seq of vertices and a Seq of edges. + */ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges .asJavaCollection, env.getJavaEnv)) } + + /** + * Creates a Graph from a Seq of edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv)) + } + + /** + * Creates a graph from a Seq of edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSets of Tuples. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], + env: ExecutionEnvironment): Graph[K, VV, EV] = { + val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSet of Tuples representing the edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSet of Tuples representing the edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv)) + } + } /** @@ -93,6 +175,14 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * @return a DataSet of Triplets, + * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue) + */ + def getTriplets(): DataSet[Triplet[K, VV, EV]] = { + wrap(jgraph.getTriplets()) + } + + /** * Apply a function to the attribute of each vertex in the graph. * * @param mapper the map function to apply. @@ -575,6 +665,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Adds the list of vertices, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more. + * + * @param verticesToAdd the list of vertices to add + * @return the new graph containing the existing and newly added vertices + */ + def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.addVertices(vertices.asJava)) + } + + /** + * Adds the given list edges to the graph. + * + * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. + * + * @param newEdges the data set of edges to be added + * @return a new graph containing the existing edges plus the newly added edges. + */ + def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.addEdges(edges.asJava)) + } + + /** * Adds the given edge to the graph. If the source and target vertices do * not exist in the graph, they will also be added. * @@ -599,6 +712,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.removeVertex(vertex)) } + /** + * Removes the given vertex and its edges from the graph. + * + * @param vertex the vertex to remove + * @return the new graph containing the existing vertices and edges without + * the removed vertex and its edges + */ + def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.removeVertices(vertices.asJava)) + } + /** * Removes all edges that match the given edge from the graph. * @@ -611,6 +735,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Removes all the edges that match the edges in the given data set from the graph. + * + * @param edgesToBeRemoved the list of edges to be removed + * @return a new graph where the edges have been removed and in which the vertices remained intact + */ + def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.removeEdges(edges.asJava)) + } + + /** * Performs union on the vertices and edges sets of the input graphs * removing duplicate vertices but maintaining duplicate edges. * @@ -622,6 +756,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Performs Difference on the vertex and edge sets of the input graphs + * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed + * @param graph the graph to perform difference with + * @return a new graph where the common vertices and edges have been removed + */ + def difference(graph: Graph[K, VV, EV]) = { + wrapGraph(jgraph.difference(graph.getWrappedGraph)) + } + + /** * Compute an aggregate over the neighbor values of each * vertex. * @@ -732,4 +876,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, maxIterations, parameters)) } + + def validate(validator: GraphValidator[K, VV, EV]): Boolean = { + jgraph.validate(validator) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala new file mode 100644 index 0000000..f2b1133 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Vertex + +class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] { + + private val serialVersionUID: Long = 1L + + override def map(value: (K, VV)): Vertex[K, VV] = { + new Vertex(value._1, value._2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala new file mode 100644 index 0000000..00cb074 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Edge + +class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] { + + private val serialVersionUID: Long = 1L + + override def map(value: (K, K, EV)): Edge[K, EV] = { + new Edge(value._1, value._2, value._3) + } +}
