[ 
https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14382680#comment-14382680
 ] 

ASF GitHub Bot commented on FLINK-1523:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/537#discussion_r27256440
  
    --- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -1141,8 +1185,33 @@ public boolean filter(Edge<K, EV> edge) {
         */
        public <M> Graph<K, VV, EV> runVertexCentricIteration(
                        VertexCentricIteration<K, VV, M, EV> iteration) {
    -           DataSet<Vertex<K, VV>> newVertices = 
vertices.runOperation(iteration);
    -           return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
    +
    +           DataSet<Tuple2<K, Long>> inDegrees = this.inDegrees();
    +           DataSet<Tuple2<K, Long>> outDegrees = this.outDegrees();
    +           DataSet<Tuple3<K, Long, Long>> degress = 
inDegrees.join(outDegrees).where(0).equalTo(0).with(new 
FlatJoinFunction<Tuple2<K,Long>, Tuple2<K,Long>, Tuple3<K,Long,Long>>() {
    +                   @Override
    +                   public void join(Tuple2<K, Long> first, Tuple2<K, Long> 
second, Collector<Tuple3<K, Long, Long>> out) throws Exception {
    +                           out.collect(new Tuple3<K, Long, 
Long>(first.f0,first.f1,second.f1));
    +                   }
    +           });
    +
    +           DataSet<Vertex<K, Tuple3<VV,Long,Long>>> verticesWithInDegrees 
= vertices.join(degress).where(0).equalTo(0).with(
    +                           new FlatJoinFunction<Vertex<K, VV>, Tuple3<K, 
Long, Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
    +                                   @Override
    +                                   public void join(Vertex<K, VV> first, 
Tuple3<K, Long, Long> second, Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) 
throws Exception {
    +                                           out.collect(new Vertex<K, 
Tuple3<VV, Long, Long>>(first.getId(),new Tuple3<VV, Long, 
Long>(first.getValue(),second.f1,second.f2)));
    +                                   }
    +                           }
    +           ).rebalance();
    --- End diff --
    
    why rebalance?


> Vertex-centric iteration extensions
> -----------------------------------
>
>                 Key: FLINK-1523
>                 URL: https://issues.apache.org/jira/browse/FLINK-1523
>             Project: Flink
>          Issue Type: Improvement
>          Components: Gelly
>            Reporter: Vasia Kalavri
>            Assignee: Andra Lungu
>
> We would like to make the following extensions to the vertex-centric 
> iterations of Gelly:
> - allow vertices to access their in/out degrees and the total number of 
> vertices of the graph, inside the iteration.
> - allow choosing the neighborhood type (in/out/all) over which to run the 
> vertex-centric iteration. Now, the model uses the updates of the in-neighbors 
> to calculate state and send messages to out-neighbors. We could add a 
> parameter with value "in/out/all" to the {{VertexUpdateFunction}} and 
> {{MessagingFunction}}, that would indicate the type of neighborhood.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to