[ 
https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700901#comment-14700901
 ] 

ASF GitHub Bot commented on FLINK-1962:
---------------------------------------

Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1004#discussion_r37273914
  
    --- Diff: 
flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 ---
    @@ -0,0 +1,735 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala
    +
    +import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.{tuple => jtuple}
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph._
    +import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, 
GatherFunction, SumFunction}
    +import org.apache.flink.graph.spargel.{MessagingFunction, 
VertexCentricConfiguration, VertexUpdateFunction}
    +import org.apache.flink.{graph => jg}
    +
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.reflect.ClassTag
    +
    +object Graph {
    +  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: 
DataSet[Edge[K, EV]],
    +                              env: ExecutionEnvironment): Graph[K, VV, EV] 
= {
    +    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, 
edges.javaSet, env.getJavaEnv))
    +  }
    +
    +  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: 
Seq[Edge[K, EV]], env:
    +  ExecutionEnvironment): Graph[K, VV, EV] = {
    +    wrapGraph(jg.Graph.fromCollection[K, VV, 
EV](vertices.asJavaCollection, edges
    +      .asJavaCollection, env.getJavaEnv))
    +  }
    +}
    +
    +/**
    + * Represents a graph consisting of {@link Edge edges} and {@link Vertex 
vertices}.
    + * @param jgraph the underlying java api Graph.
    + * @tparam K the key type for vertex and edge identifiers
    + * @tparam VV the value type for vertices
    + * @tparam EV the value type for edges
    + * @see org.apache.flink.graph.Edge
    + * @see org.apache.flink.graph.Vertex
    + */
    +final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    +
    +  private[flink] def getWrappedGraph = jgraph
    +
    +
    +  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
    +    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
    +      ClosureCleaner.clean(f, checkSerializable)
    +    }
    +    ClosureCleaner.ensureSerializable(f)
    +    f
    +  }
    +
    +  /**
    +   * @return the vertex DataSet.
    +   */
    +  def getVertices = wrap(jgraph.getVertices)
    +
    +  /**
    +   * @return the edge DataSet.
    +   */
    +  def getEdges = wrap(jgraph.getEdges)
    +
    +  /**
    +   * @return the vertex DataSet as Tuple2.
    +   */
    +  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
    +    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
    +  }
    +
    +  /**
    +   * @return the edge DataSet as Tuple3.
    +   */
    +  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
    +    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, 
jtuple.f2))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](mapper: 
MapFunction[Vertex[K, VV], NV]):
    +  Graph[K, NV, EV] = {
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](
    +      mapper,
    +      createTypeInformation[Vertex[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => 
NV): Graph[K, NV, EV] = {
    +    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, 
VV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Vertex[K, VV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, 
createTypeInformation[Vertex[K, NV]]))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, 
EV], NV]): Graph[K,
    +    VV, NV] = {
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](
    +      mapper,
    +      createTypeInformation[Edge[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): 
Graph[K, VV, NV] = {
    +    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, 
EV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Edge[K, EV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, 
createTypeInformation[Edge[K, NV]]))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and 
applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
mapper: MapFunction[
    +    (VV, T), VV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and 
applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
fun: (VV, T) => VV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of 
both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], 
mapper: MapFunction[
    +    (EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of 
both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], 
fun: (EV, T) => EV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. In case the inputDataSet contains the same key 
more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. In case the inputDataSet contains the same key 
more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: 
FilterFunction[Edge[K,
    +    EV]]) = {
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: 
Edge[K, EV] =>
    +    Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = 
cleanVertexFun(value)
    +    }
    +
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = 
cleanEdgeFun(value)
    +    }
    +
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = 
cleanVertexFun(value)
    +    }
    +
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
    +    wrapGraph(jgraph.filterOnEdges(edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = 
cleanEdgeFun(value)
    +    }
    +
    +    //wrapGraph(jgraph.filterOnEdges(edgeFilter))
    --- End diff --
    
    Maybe unnecessary comment?


> Add Gelly Scala API
> -------------------
>
>                 Key: FLINK-1962
>                 URL: https://issues.apache.org/jira/browse/FLINK-1962
>             Project: Flink
>          Issue Type: Task
>          Components: Gelly, Scala API
>    Affects Versions: 0.9
>            Reporter: Vasia Kalavri
>            Assignee: PJ Van Aeken
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to