[ 
https://issues.apache.org/jira/browse/FLINK-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501469#comment-14501469
 ] 

ASF GitHub Bot commented on FLINK-1514:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645524
  
    --- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
 ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths 
algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements 
ProgramDescription {
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Program
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   public static void main(String[] args) throws Exception {
    +
    +           if(!parseParameters(args)) {
    +                   return;
    +           }
    +
    +           ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +           DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +           Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
    +
    +           // The path from src to trg through edge e costs src + e
    +           // If the target's distance is 0 (the target is the actual 
source), return 0 instead
    +           GatherFunction<Double, Double, Double> gather = new 
SingleSourceShortestPathGather();
    +
    +           // Return the smaller path length to minimize distance
    +           SumFunction<Double, Double, Double> sum = new 
SingleSourceShortestPathSum();
    +
    +           // Iterate as long as the distance is updated
    +           ApplyFunction<Double, Double, Double> apply = new 
SingleSourceShortestPathApply();
    +
    +           // Execute the GSA iteration
    +           GatherSumApplyIteration<Long, Double, Double, Double> iteration 
= graph.createGatherSumApplyIteration(
    +                           gather, sum, apply, maxIterations);
    +           Graph<Long, Double, Double> result = graph.mapVertices(new 
InitVerticesMapper<Long>(srcVertexId))
    +                           .runGatherSumApplyIteration(iteration);
    +
    +           // Extract the vertices as the result
    +           DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
    +
    +           // emit result
    +           if(fileOutput) {
    +                   singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
" ");
    +           } else {
    +                   singleSourceShortestPaths.print();
    +           }
    +
    +           env.execute("GSA Single Source Shortest Paths Example");
    +   }
    +
    +   public static final class InitVerticesMapper<K extends Comparable<K> & 
Serializable>
    +                   implements MapFunction<Vertex<K, Double>, Double> {
    +
    +           private K srcVertexId;
    +
    +           public InitVerticesMapper(K srcId) {
    +                   this.srcVertexId = srcId;
    +           }
    +
    +           public Double map(Vertex<K, Double> value) {
    +                   if (value.f0.equals(srcVertexId)) {
    +                           return 0.0;
    +                   } else {
    +                           return Double.POSITIVE_INFINITY;
    +                   }
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Single Source Shortest Path UDFs
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   private static final class SingleSourceShortestPathGather
    +                   extends GatherFunction<Double, Double, Double> {
    +           @Override
    +           public Double gather(Triplet<Double, Double> triplet) {
    +                   return triplet.getSource() + triplet.getEdge();
    +           }
    +   };
    +
    +   private static final class SingleSourceShortestPathSum
    +                   extends SumFunction<Double, Double, Double> {
    +           @Override
    +           public Double sum(Double arg0, Double arg1) {
    +                   return Math.min(arg0, arg1);
    +           }
    +   };
    +
    +   private static final class SingleSourceShortestPathApply
    +                   extends ApplyFunction<Double, Double, Double> {
    +           @Override
    +           public void apply(Double summed, Double target) {
    --- End diff --
    
    How about this?


> [Gelly] Add a Gather-Sum-Apply iteration method
> -----------------------------------------------
>
>                 Key: FLINK-1514
>                 URL: https://issues.apache.org/jira/browse/FLINK-1514
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 0.9
>            Reporter: Vasia Kalavri
>            Assignee: Daniel Bali
>
> This will be a method that implements the GAS computation model, but without 
> the "scatter" step. The phases can be mapped into the following steps inside 
> a delta iteration:
> gather: a map on each < srcVertex, edge, trgVertex > that produces a partial 
> value
> sum: a reduce that combines the partial values
> apply: join with vertex set to update the vertex values using the results of 
> sum and the previous state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to