[
https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700901#comment-14700901
]
ASF GitHub Bot commented on FLINK-1962:
---------------------------------------
Github user chiwanpark commented on a diff in the pull request:
https://github.com/apache/flink/pull/1004#discussion_r37273914
--- Diff:
flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
---
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{tuple => jtuple}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration,
GatherFunction, SumFunction}
+import org.apache.flink.graph.spargel.{MessagingFunction,
VertexCentricConfiguration, VertexUpdateFunction}
+import org.apache.flink.{graph => jg}
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.reflect.ClassTag
+
+object Graph {
+ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation :
ClassTag, EV:
+ TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges:
DataSet[Edge[K, EV]],
+ env: ExecutionEnvironment): Graph[K, VV, EV]
= {
+ wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet,
edges.javaSet, env.getJavaEnv))
+ }
+
+ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation :
ClassTag, EV:
+ TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges:
Seq[Edge[K, EV]], env:
+ ExecutionEnvironment): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, VV,
EV](vertices.asJavaCollection, edges
+ .asJavaCollection, env.getJavaEnv))
+ }
+}
+
+/**
+ * Represents a graph consisting of {@link Edge edges} and {@link Vertex
vertices}.
+ * @param jgraph the underlying java api Graph.
+ * @tparam K the key type for vertex and edge identifiers
+ * @tparam VV the value type for vertices
+ * @tparam EV the value type for edges
+ * @see org.apache.flink.graph.Edge
+ * @see org.apache.flink.graph.Vertex
+ */
+final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation :
ClassTag, EV:
+TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
+
+ private[flink] def getWrappedGraph = jgraph
+
+
+ private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean =
true): F = {
+ if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
+ ClosureCleaner.clean(f, checkSerializable)
+ }
+ ClosureCleaner.ensureSerializable(f)
+ f
+ }
+
+ /**
+ * @return the vertex DataSet.
+ */
+ def getVertices = wrap(jgraph.getVertices)
+
+ /**
+ * @return the edge DataSet.
+ */
+ def getEdges = wrap(jgraph.getEdges)
+
+ /**
+ * @return the vertex DataSet as Tuple2.
+ */
+ def getVerticesAsTuple2(): DataSet[(K, VV)] = {
+ wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
+ }
+
+ /**
+ * @return the edge DataSet as Tuple3.
+ */
+ def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
+ wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1,
jtuple.f2))
+ }
+
+ /**
+ * Apply a function to the attribute of each vertex in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @return a new graph
+ */
+ def mapVertices[NV: TypeInformation : ClassTag](mapper:
MapFunction[Vertex[K, VV], NV]):
+ Graph[K, NV, EV] = {
+ new Graph[K, NV, EV](jgraph.mapVertices[NV](
+ mapper,
+ createTypeInformation[Vertex[K, NV]]
+ ))
+ }
+
+ /**
+ * Apply a function to the attribute of each vertex in the graph.
+ *
+ * @param fun the map function to apply.
+ * @return a new graph
+ */
+ def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] =>
NV): Graph[K, NV, EV] = {
+ val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K,
VV], NV] {
+ val cleanFun = clean(fun)
+
+ def map(in: Vertex[K, VV]): NV = cleanFun(in)
+ }
+ new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper,
createTypeInformation[Vertex[K, NV]]))
+ }
+
+ /**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @return a new graph
+ */
+ def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K,
EV], NV]): Graph[K,
+ VV, NV] = {
+ new Graph[K, VV, NV](jgraph.mapEdges[NV](
+ mapper,
+ createTypeInformation[Edge[K, NV]]
+ ))
+ }
+
+ /**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param fun the map function to apply.
+ * @return a new graph
+ */
+ def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV):
Graph[K, VV, NV] = {
+ val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K,
EV], NV] {
+ val cleanFun = clean(fun)
+
+ def map(in: Edge[K, EV]): NV = cleanFun(in)
+ }
+ new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper,
createTypeInformation[Edge[K, NV]]))
+ }
+
+ /**
+ * Joins the vertex DataSet of this graph with an input DataSet and
applies
+ * a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @return a new graph where the vertex values have been updated.
+ */
+ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
mapper: MapFunction[
+ (VV, T), VV]): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+ override def map(value: jtuple.Tuple2[VV, T]): VV = {
+ mapper.map((value.f0, value.f1))
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the vertex DataSet of this graph with an input DataSet and
applies
+ * a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param fun the UDF map function to apply.
+ * @return a new graph where the vertex values have been updated.
+ */
+ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
fun: (VV, T) => VV):
+ Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+ val cleanFun = clean(fun)
+
+ override def map(value: jtuple.Tuple2[VV, T]): VV = {
+ cleanFun(value.f0, value.f1)
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on a composite key of
both
+ * source and target and applies a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
mapper: MapFunction[
+ (EV, T), EV]): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ mapper.map((value.f0, value.f1))
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple3(scalatuple._1,
+ scalatuple._2, scalatuple._3)).javaSet
+ wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on a composite key of
both
+ * source and target and applies a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param fun the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
fun: (EV, T) => EV):
+ Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val cleanFun = clean(fun)
+
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ cleanFun(value.f0, value.f1)
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple3(scalatuple._1,
+ scalatuple._2, scalatuple._3)).javaSet
+ wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the source key of the
+ * edges and the first attribute of the input DataSet and applies a UDF
on
+ * the resulted values. In case the inputDataSet contains the same key
more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K,
T)], mapper:
+ MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ mapper.map((value.f0, value.f1))
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the source key of the
+ * edges and the first attribute of the input DataSet and applies a UDF
on
+ * the resulted values. In case the inputDataSet contains the same key
more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param fun the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K,
T)], fun: (EV, T) =>
+ EV): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val cleanFun = clean(fun)
+
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ cleanFun(value.f0, value.f1)
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the target key of the
+ * edges and the first attribute of the input DataSet and applies a UDF
on
+ * the resulted values. Should the inputDataSet contain the same key more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K,
T)], mapper:
+ MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ mapper.map((value.f0, value.f1))
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the target key of the
+ * edges and the first attribute of the input DataSet and applies a UDF
on
+ * the resulted values. Should the inputDataSet contain the same key more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param fun the UDF map function to apply.
+ * @tparam T the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K,
T)], fun: (EV, T) =>
+ EV): Graph[K, VV, EV] = {
+ val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val cleanFun = clean(fun)
+
+ override def map(value: jtuple.Tuple2[EV, T]): EV = {
+ cleanFun(value.f0, value.f1)
+ }
+ }
+ val javaTupleSet = inputDataSet.map(scalatuple => new
jtuple.Tuple2(scalatuple._1,
+ scalatuple._2)).javaSet
+ wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+ }
+
+ /**
+ * Apply filtering functions to the graph and return a sub-graph that
+ * satisfies the predicates for both vertices and edges.
+ *
+ * @param vertexFilter the filter function for vertices.
+ * @param edgeFilter the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter:
FilterFunction[Edge[K,
+ EV]]) = {
+ wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+ }
+
+ /**
+ * Apply filtering functions to the graph and return a sub-graph that
+ * satisfies the predicates for both vertices and edges.
+ *
+ * @param vertexFilterFun the filter function for vertices.
+ * @param edgeFilterFun the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun:
Edge[K, EV] =>
+ Boolean) = {
+ val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+ val cleanVertexFun = clean(vertexFilterFun)
+
+ override def filter(value: Vertex[K, VV]): Boolean =
cleanVertexFun(value)
+ }
+
+ val edgeFilter = new FilterFunction[Edge[K, EV]] {
+ val cleanEdgeFun = clean(edgeFilterFun)
+
+ override def filter(value: Edge[K, EV]): Boolean =
cleanEdgeFun(value)
+ }
+
+ wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the vertices.
+ *
+ * @param vertexFilter the filter function for vertices.
+ * @return the resulting sub-graph.
+ */
+ def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
+ wrapGraph(jgraph.filterOnVertices(vertexFilter))
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the vertices.
+ *
+ * @param vertexFilterFun the filter function for vertices.
+ * @return the resulting sub-graph.
+ */
+ def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
+ val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+ val cleanVertexFun = clean(vertexFilterFun)
+
+ override def filter(value: Vertex[K, VV]): Boolean =
cleanVertexFun(value)
+ }
+
+ wrapGraph(jgraph.filterOnVertices(vertexFilter))
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the edges.
+ *
+ * @param edgeFilter the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
+ wrapGraph(jgraph.filterOnEdges(edgeFilter))
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the edges.
+ *
+ * @param edgeFilterFun the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
+ val edgeFilter = new FilterFunction[Edge[K, EV]] {
+ val cleanEdgeFun = clean(edgeFilterFun)
+
+ override def filter(value: Edge[K, EV]): Boolean =
cleanEdgeFun(value)
+ }
+
+ //wrapGraph(jgraph.filterOnEdges(edgeFilter))
--- End diff --
Maybe unnecessary comment?
> Add Gelly Scala API
> -------------------
>
> Key: FLINK-1962
> URL: https://issues.apache.org/jira/browse/FLINK-1962
> Project: Flink
> Issue Type: Task
> Components: Gelly, Scala API
> Affects Versions: 0.9
> Reporter: Vasia Kalavri
> Assignee: PJ Van Aeken
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)