[
https://issues.apache.org/jira/browse/FLINK-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501469#comment-14501469
]
ASF GitHub Bot commented on FLINK-1514:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/408#discussion_r28645524
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ExampleUtils;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Triplet;
+
+import java.io.Serializable;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths
algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPathsExample implements
ProgramDescription {
+
+ //
--------------------------------------------------------------------------------------------
+ // Program
+ //
--------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices,
edges, env);
+
+ // The path from src to trg through edge e costs src + e
+ // If the target's distance is 0 (the target is the actual
source), return 0 instead
+ GatherFunction<Double, Double, Double> gather = new
SingleSourceShortestPathGather();
+
+ // Return the smaller path length to minimize distance
+ SumFunction<Double, Double, Double> sum = new
SingleSourceShortestPathSum();
+
+ // Iterate as long as the distance is updated
+ ApplyFunction<Double, Double, Double> apply = new
SingleSourceShortestPathApply();
+
+ // Execute the GSA iteration
+ GatherSumApplyIteration<Long, Double, Double, Double> iteration
= graph.createGatherSumApplyIteration(
+ gather, sum, apply, maxIterations);
+ Graph<Long, Double, Double> result = graph.mapVertices(new
InitVerticesMapper<Long>(srcVertexId))
+ .runGatherSumApplyIteration(iteration);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> singleSourceShortestPaths =
result.getVertices();
+
+ // emit result
+ if(fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n",
" ");
+ } else {
+ singleSourceShortestPaths.print();
+ }
+
+ env.execute("GSA Single Source Shortest Paths Example");
+ }
+
+ public static final class InitVerticesMapper<K extends Comparable<K> &
Serializable>
+ implements MapFunction<Vertex<K, Double>, Double> {
+
+ private K srcVertexId;
+
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
+ }
+
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
+ return 0.0;
+ } else {
+ return Double.POSITIVE_INFINITY;
+ }
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ //
--------------------------------------------------------------------------------------------
+
+ private static final class SingleSourceShortestPathGather
+ extends GatherFunction<Double, Double, Double> {
+ @Override
+ public Double gather(Triplet<Double, Double> triplet) {
+ return triplet.getSource() + triplet.getEdge();
+ }
+ };
+
+ private static final class SingleSourceShortestPathSum
+ extends SumFunction<Double, Double, Double> {
+ @Override
+ public Double sum(Double arg0, Double arg1) {
+ return Math.min(arg0, arg1);
+ }
+ };
+
+ private static final class SingleSourceShortestPathApply
+ extends ApplyFunction<Double, Double, Double> {
+ @Override
+ public void apply(Double summed, Double target) {
--- End diff --
How about this?
> [Gelly] Add a Gather-Sum-Apply iteration method
> -----------------------------------------------
>
> Key: FLINK-1514
> URL: https://issues.apache.org/jira/browse/FLINK-1514
> Project: Flink
> Issue Type: New Feature
> Components: Gelly
> Affects Versions: 0.9
> Reporter: Vasia Kalavri
> Assignee: Daniel Bali
>
> This will be a method that implements the GAS computation model, but without
> the "scatter" step. The phases can be mapped into the following steps inside
> a delta iteration:
> gather: a map on each < srcVertex, edge, trgVertex > that produces a partial
> value
> sum: a reduce that combines the partial values
> apply: join with vertex set to update the vertex values using the results of
> sum and the previous state.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)